diff --git a/pom.xml b/pom.xml index 025a88e0c7d941c57b7f2096a2ea898cd6d6dea3..10cfc273c8f0b72026fd02121e84901646e311f0 100644 --- a/pom.xml +++ b/pom.xml @@ -171,6 +171,11 @@ 3.5 test + + io.projectreactor + reactor-core + 3.2.10.RELEASE + diff --git a/pom4ide.xml b/pom4ide.xml index 3edd1b91cfc12d32383d16b5c5c326f3b7f24b9b..55ca0dcc4ca46d4e44cf8b76a68cefe1fcad3bff 100644 --- a/pom4ide.xml +++ b/pom4ide.xml @@ -171,6 +171,11 @@ 3.5 test + + io.projectreactor + reactor-core + 3.2.10.RELEASE + diff --git a/src/test/java/com/alibaba/integration/ReactorIntegrationDemo.kt b/src/test/java/com/alibaba/integration/ReactorIntegrationDemo.kt new file mode 100644 index 0000000000000000000000000000000000000000..9b11798c894e6c2199de82a0b962081cb9c46157 --- /dev/null +++ b/src/test/java/com/alibaba/integration/ReactorIntegrationDemo.kt @@ -0,0 +1,48 @@ +@file:JvmName("ReactorIntegrationDemo") + +package com.alibaba.integration + +import com.alibaba.ttl.TransmittableThreadLocal +import com.alibaba.ttl.threadpool.TtlExecutors +import reactor.core.publisher.Flux +import reactor.core.scheduler.Schedulers + +fun main() { + // TTL integration for Reactor + Schedulers.addExecutorServiceDecorator("TransmittableThreadLocal") { _, scheduledExecutorService -> + TtlExecutors.getTtlScheduledExecutorService(scheduledExecutorService) + } + + val ttl = TransmittableThreadLocal() + ttl.set("init") + // expand thread pool + Flux.range(1, 20) + .flatMap { + Flux.just(it) + .subscribeOn(Schedulers.parallel()) + .doOnNext { + Thread.sleep(2) + println("expand thread pool: [${Thread.currentThread().name}] $it ${ttl.get()}") + } + } + .collectList() + .block() + + ttl.set("jerry") + Flux.just("Hello") + .subscribeOn(Schedulers.parallel()) + .doOnNext { + println("[${Thread.currentThread().name}] $it ${ttl.get()}") + } + .collectList() + .block() + + ttl.set("tom") + Flux.just("Hello") + .subscribeOn(Schedulers.parallel()) + .doOnNext { + println("[${Thread.currentThread().name}] $it ${ttl.get()}") + } + .collectList() + .block() +} diff --git a/src/test/java/com/alibaba/integration/RxJavaIntegrationDemo.kt b/src/test/java/com/alibaba/integration/RxJavaIntegrationDemo.kt new file mode 100644 index 0000000000000000000000000000000000000000..21b4d21b632bcf10fb81502a6bb77f862a796808 --- /dev/null +++ b/src/test/java/com/alibaba/integration/RxJavaIntegrationDemo.kt @@ -0,0 +1,47 @@ +@file:JvmName("RxJavaIntegrationDemo") + +package com.alibaba.integration + +import com.alibaba.ttl.TransmittableThreadLocal +import com.alibaba.ttl.TtlRunnable +import io.reactivex.Flowable +import io.reactivex.plugins.RxJavaPlugins +import io.reactivex.schedulers.Schedulers + +fun main() { + val ttl = TransmittableThreadLocal() + ttl.set("init") + // expand thread pool + Flowable.range(1, 20) + .flatMap { + Flowable.just(it) + .observeOn(Schedulers.computation()) + .doOnNext { + Thread.sleep(2) + println("expand thread pool: [${Thread.currentThread().name}] $it ${ttl.get()}") + } + } + .toList() + .blockingGet() + + // TTL integration for RxJava + RxJavaPlugins.setScheduleHandler(TtlRunnable::get) + + ttl.set("jerry") + Flowable.just("Hello") + .observeOn(Schedulers.computation()) + .doOnNext { + println("[${Thread.currentThread().name}] $it ${ttl.get()}") + } + .toList() + .blockingGet() + + ttl.set("tom") + Flowable.just("Hello") + .subscribeOn(Schedulers.computation()) + .doOnNext { + println("[${Thread.currentThread().name}] $it ${ttl.get()}") + } + .toList() + .blockingGet() +}