admin管理员组文章数量:1335887
I need to handle different types of events in a strict one by one manner, but in a background thread.
According to the documentation, the next code, Schedulers.from(executor, false, true);
, should cover my requirements, but in reality it doesn't.
The code:
ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();
subject1.observeOn(scheduler).subscribe(log::info);
subject2.observeOn(scheduler).subscribe(log::info);
subject1.onNext("Hello11");
subject2.onNext("Hello21");
subject1.onNext("Hello12");
subject2.onNext("Hello22");
subject1.onNext("Hello13");
subject2.onNext("Hello23");
log.info("Test activity");
Has the following output:
22:19:05.313 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity
22:19:05.313 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
Which shows, that events handling is executed in a greedy manner, when every Observer gives all events before freeing the scheduler
, which contradicts with the documentation.
If .observeOn(scheduler)
is replaced with the .subscribeOn(scheduler)
the output is the next:
22:23:56.162 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity
Which executes all events in the same thread, which contradicts with the whole idea of .subscribeOn
.
Is this a bug or there is a way to make it work as expected in the documentation? The version is io.reactivex.rxjava3:rxjava:3.1.9
.
I need to handle different types of events in a strict one by one manner, but in a background thread.
According to the documentation, the next code, Schedulers.from(executor, false, true);
, should cover my requirements, but in reality it doesn't.
The code:
ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();
subject1.observeOn(scheduler).subscribe(log::info);
subject2.observeOn(scheduler).subscribe(log::info);
subject1.onNext("Hello11");
subject2.onNext("Hello21");
subject1.onNext("Hello12");
subject2.onNext("Hello22");
subject1.onNext("Hello13");
subject2.onNext("Hello23");
log.info("Test activity");
Has the following output:
22:19:05.313 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity
22:19:05.313 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
Which shows, that events handling is executed in a greedy manner, when every Observer gives all events before freeing the scheduler
, which contradicts with the documentation.
If .observeOn(scheduler)
is replaced with the .subscribeOn(scheduler)
the output is the next:
22:23:56.162 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity
Which executes all events in the same thread, which contradicts with the whole idea of .subscribeOn
.
Is this a bug or there is a way to make it work as expected in the documentation? The version is io.reactivex.rxjava3:rxjava:3.1.9
.
1 Answer
Reset to default 1It is not a bug with the Scheduler
but the consequence of the observeOn
which always operates in a greedy manner. In the first case, because all items to the first sequence was available practically immediately to the observeOn, it emits those on the same thread in one executor run.
You can use another operator which creates one task per item such as delay
with zero delay to get a better interleaving:
subject1.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
subject2.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
The second case is working as intended because using subscribeOn
on a Subject
has no effect on the items it delivers. In your case, the items were emitted and thus processed on the same thread as it would happen without subscribeOn
.
本文标签: javaHow to handle multiple subscriptions in a fair mannerStack Overflow
版权声明:本文标题:java - How to handle multiple subscriptions in a fair manner? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742397178a2467165.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论