admin管理员组文章数量:1122846
In my use case, I need to use the RSocket protocol to create a reactive stream over the network, and on the client side return a pekko Source, and on the server side return a pekko Sink. I have no problems on the client side because creating the connection returns a Flux, which can be trivially converted to a pekko Source with Source.fromPublisher.
On the server side, RSocket expects a Flux as input, but I have other code that is expecting a pekko Sink as a result (it can't take a ManyWithUpstream unless I can convert that to a Sink).
Here's a minimal code example that works flawlessly for my use case but relies on the deprecated EmitterProcessor.
def serverSink : Sink[Payload, NotUsed] = {
val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(processor)
)).bindNow(TcpServerTransport.create("localhost", 3141))
Sink.fromSubscriber(processor)
}
I have tried the following but when I try to run it nothing fed to the Sink will be actually fed into RSocket for reasons I don't completely understand.
def serverSink : Sink[Payload, NotUsed] = {
val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(processor)
)).bindNow(TcpServerTransport.create("localhost", 3141))
Sink.fromSubscriber(processor)
}
In my use case, I need to use the RSocket protocol to create a reactive stream over the network, and on the client side return a pekko Source, and on the server side return a pekko Sink. I have no problems on the client side because creating the connection returns a Flux, which can be trivially converted to a pekko Source with Source.fromPublisher.
On the server side, RSocket expects a Flux as input, but I have other code that is expecting a pekko Sink as a result (it can't take a ManyWithUpstream unless I can convert that to a Sink).
Here's a minimal code example that works flawlessly for my use case but relies on the deprecated EmitterProcessor.
def serverSink : Sink[Payload, NotUsed] = {
val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(processor)
)).bindNow(TcpServerTransport.create("localhost", 3141))
Sink.fromSubscriber(processor)
}
I have tried the following but when I try to run it nothing fed to the Sink will be actually fed into RSocket for reasons I don't completely understand.
def serverSink : Sink[Payload, NotUsed] = {
val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(processor)
)).bindNow(TcpServerTransport.create("localhost", 3141))
Sink.fromSubscriber(processor)
}
Share
Improve this question
asked Nov 22, 2024 at 15:59
David MastersDavid Masters
12 bronze badges
1 Answer
Reset to default 0So, I have a solution, but it's a little hacky. For reasons I don't understand, the publisher from pekko needs a subscriber other than the ones over the ones from RSocket. This works, but with the caveat that the sink will consume everything it can if there are no connected clients (which is actually preferred in my use case)
def serverSink : Sink[Payload, NotUsed] = {
val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(true)
sink.mapMaterializedValue { pub =>
pub.subscribe(new Subscriber[Payload] {
override def onComplete(): Unit = ()
override def onError(t: Throwable): Unit = ()
override def onNext(t: Payload): Unit = ()
override def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
})
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(pub)
)).bindNow(TcpServerTransport.create("localhost", 3141))
NotUsed.notUsed()
}
}
本文标签:
版权声明:本文标题:scala - Is there a way to create a RSocket "forRequestStream" and return it as a PekkoAkka Sink without using 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736302514a1931563.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论