admin管理员组

文章数量:1122846

In my use case, I need to use the RSocket protocol to create a reactive stream over the network, and on the client side return a pekko Source, and on the server side return a pekko Sink. I have no problems on the client side because creating the connection returns a Flux, which can be trivially converted to a pekko Source with Source.fromPublisher.

On the server side, RSocket expects a Flux as input, but I have other code that is expecting a pekko Sink as a result (it can't take a ManyWithUpstream unless I can convert that to a Sink).

Here's a minimal code example that works flawlessly for my use case but relies on the deprecated EmitterProcessor.

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }

I have tried the following but when I try to run it nothing fed to the Sink will be actually fed into RSocket for reasons I don't completely understand.

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }

In my use case, I need to use the RSocket protocol to create a reactive stream over the network, and on the client side return a pekko Source, and on the server side return a pekko Sink. I have no problems on the client side because creating the connection returns a Flux, which can be trivially converted to a pekko Source with Source.fromPublisher.

On the server side, RSocket expects a Flux as input, but I have other code that is expecting a pekko Sink as a result (it can't take a ManyWithUpstream unless I can convert that to a Sink).

Here's a minimal code example that works flawlessly for my use case but relies on the deprecated EmitterProcessor.

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }

I have tried the following but when I try to run it nothing fed to the Sink will be actually fed into RSocket for reasons I don't completely understand.

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }
Share Improve this question asked Nov 22, 2024 at 15:59 David MastersDavid Masters 12 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

So, I have a solution, but it's a little hacky. For reasons I don't understand, the publisher from pekko needs a subscriber other than the ones over the ones from RSocket. This works, but with the caveat that the sink will consume everything it can if there are no connected clients (which is actually preferred in my use case)

def serverSink : Sink[Payload, NotUsed] = {
    val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(true)
    sink.mapMaterializedValue { pub =>
      pub.subscribe(new Subscriber[Payload] {
        override def onComplete(): Unit = ()
        override def onError(t: Throwable): Unit = ()
        override def onNext(t: Payload): Unit = ()
        override def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
      })
      RSocketServer.create(
        SocketAcceptor.forRequestStream(payload =>
          Flux.from(pub)
      )).bindNow(TcpServerTransport.create("localhost", 3141))
      NotUsed.notUsed()
    }
  }

本文标签: