admin管理员组

文章数量:1345090

There is a very common pattern when dealing a remote service: A sequential series of calls to the service, each of which invokes a callback, asynchronously, when complete. I believe, for instance, that nginx is, essentially, based on it.

Here is an outline of the pattern: doFirst invokes the callback onFirst and so on.

public class Demo implements Service.Listener {
    private Service service;

    public void start() {
        service = new Service(this);
        service.runTask(service::doFirst);
    }

    @Override
    public void onFirst(Map<String, Object> data) {
        processData1(data);
        service.runTask(service::doSecond);
    }

    @Override
    public void onSecond(Map<String, Object> data) {
        processData2(data);
        service.runTask(service::doThird);
    }

    @Override
    public void onThird(Map<String, Object> data) {
        processData3(data);
    }
    // ...
}

Here are some constraints, again, fairly common, I believe:

  • The callback listener is registered with the service on creation (new Service(this)). It can’t be changed.
  • The processing in the Demo class (processData) can happen in any available context. Extra points if it is cancellable.
  • The calls to the service must be executed serially by all service subscribers. The runTask method highlights this constraint: assume it executes tasks on a global single threaded executor, thus forcing the execution of only one task at a time.

The question is: what is the idiomatic way to write this in Kotlin, with coroutines and Flows (Channels)? Is there more elegant way to manage all those callbacks and still force one-at-a-time execution of the service tasks?

There is a very common pattern when dealing a remote service: A sequential series of calls to the service, each of which invokes a callback, asynchronously, when complete. I believe, for instance, that nginx is, essentially, based on it.

Here is an outline of the pattern: doFirst invokes the callback onFirst and so on.

public class Demo implements Service.Listener {
    private Service service;

    public void start() {
        service = new Service(this);
        service.runTask(service::doFirst);
    }

    @Override
    public void onFirst(Map<String, Object> data) {
        processData1(data);
        service.runTask(service::doSecond);
    }

    @Override
    public void onSecond(Map<String, Object> data) {
        processData2(data);
        service.runTask(service::doThird);
    }

    @Override
    public void onThird(Map<String, Object> data) {
        processData3(data);
    }
    // ...
}

Here are some constraints, again, fairly common, I believe:

  • The callback listener is registered with the service on creation (new Service(this)). It can’t be changed.
  • The processing in the Demo class (processData) can happen in any available context. Extra points if it is cancellable.
  • The calls to the service must be executed serially by all service subscribers. The runTask method highlights this constraint: assume it executes tasks on a global single threaded executor, thus forcing the execution of only one task at a time.

The question is: what is the idiomatic way to write this in Kotlin, with coroutines and Flows (Channels)? Is there more elegant way to manage all those callbacks and still force one-at-a-time execution of the service tasks?

Share Improve this question asked 13 hours ago G. Blake MeikeG. Blake Meike 6,7053 gold badges26 silver badges41 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

With an appropriately designed coroutine API, this is 100% trivial with coroutines, and just looks like

val data1 = service.doFirst()
processData1(data1)
val data2 = service.doSecond()
processData2(data2)
...

Coroutines only do things concurrently if you explicitly ask for it. No other synchronization logic is required.

You will need appropriate coroutine APIs for doFirst() etc, but that will depend more precisely on how the service works in ways you haven't shown us. Coroutine code should not involve any callbacks, instead, the callbacks should be wrapped in coroutine APIs, generally with suspendCancellableCoroutine , whose documentation shows how to do that.

本文标签: Managing sequential callbacks with KotlinStack Overflow