From a19501d7a0c4221d2acb6ae03eb466b4829f6620 Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Mon, 29 Jul 2019 23:23:16 +0800 Subject: [PATCH] add reactive integration demo --- pom.xml | 5 ++ pom4ide.xml | 5 ++ .../integration/ReactorIntegrationDemo.kt | 48 +++++++++++++++++++ .../integration/RxJavaIntegrationDemo.kt | 47 ++++++++++++++++++ 4 files changed, 105 insertions(+) create mode 100644 src/test/java/com/alibaba/integration/ReactorIntegrationDemo.kt create mode 100644 src/test/java/com/alibaba/integration/RxJavaIntegrationDemo.kt diff --git a/pom.xml b/pom.xml index 025a88e0..10cfc273 100644 --- a/pom.xml +++ b/pom.xml @@ -171,6 +171,11 @@ 3.5 test + + io.projectreactor + reactor-core + 3.2.10.RELEASE + diff --git a/pom4ide.xml b/pom4ide.xml index 3edd1b91..55ca0dcc 100644 --- a/pom4ide.xml +++ b/pom4ide.xml @@ -171,6 +171,11 @@ 3.5 test + + io.projectreactor + reactor-core + 3.2.10.RELEASE + diff --git a/src/test/java/com/alibaba/integration/ReactorIntegrationDemo.kt b/src/test/java/com/alibaba/integration/ReactorIntegrationDemo.kt new file mode 100644 index 00000000..9b11798c --- /dev/null +++ b/src/test/java/com/alibaba/integration/ReactorIntegrationDemo.kt @@ -0,0 +1,48 @@ +@file:JvmName("ReactorIntegrationDemo") + +package com.alibaba.integration + +import com.alibaba.ttl.TransmittableThreadLocal +import com.alibaba.ttl.threadpool.TtlExecutors +import reactor.core.publisher.Flux +import reactor.core.scheduler.Schedulers + +fun main() { + // TTL integration for Reactor + Schedulers.addExecutorServiceDecorator("TransmittableThreadLocal") { _, scheduledExecutorService -> + TtlExecutors.getTtlScheduledExecutorService(scheduledExecutorService) + } + + val ttl = TransmittableThreadLocal() + ttl.set("init") + // expand thread pool + Flux.range(1, 20) + .flatMap { + Flux.just(it) + .subscribeOn(Schedulers.parallel()) + .doOnNext { + Thread.sleep(2) + println("expand thread pool: [${Thread.currentThread().name}] $it ${ttl.get()}") + } + } + .collectList() + .block() + + ttl.set("jerry") + Flux.just("Hello") + .subscribeOn(Schedulers.parallel()) + .doOnNext { + println("[${Thread.currentThread().name}] $it ${ttl.get()}") + } + .collectList() + .block() + + ttl.set("tom") + Flux.just("Hello") + .subscribeOn(Schedulers.parallel()) + .doOnNext { + println("[${Thread.currentThread().name}] $it ${ttl.get()}") + } + .collectList() + .block() +} diff --git a/src/test/java/com/alibaba/integration/RxJavaIntegrationDemo.kt b/src/test/java/com/alibaba/integration/RxJavaIntegrationDemo.kt new file mode 100644 index 00000000..21b4d21b --- /dev/null +++ b/src/test/java/com/alibaba/integration/RxJavaIntegrationDemo.kt @@ -0,0 +1,47 @@ +@file:JvmName("RxJavaIntegrationDemo") + +package com.alibaba.integration + +import com.alibaba.ttl.TransmittableThreadLocal +import com.alibaba.ttl.TtlRunnable +import io.reactivex.Flowable +import io.reactivex.plugins.RxJavaPlugins +import io.reactivex.schedulers.Schedulers + +fun main() { + val ttl = TransmittableThreadLocal() + ttl.set("init") + // expand thread pool + Flowable.range(1, 20) + .flatMap { + Flowable.just(it) + .observeOn(Schedulers.computation()) + .doOnNext { + Thread.sleep(2) + println("expand thread pool: [${Thread.currentThread().name}] $it ${ttl.get()}") + } + } + .toList() + .blockingGet() + + // TTL integration for RxJava + RxJavaPlugins.setScheduleHandler(TtlRunnable::get) + + ttl.set("jerry") + Flowable.just("Hello") + .observeOn(Schedulers.computation()) + .doOnNext { + println("[${Thread.currentThread().name}] $it ${ttl.get()}") + } + .toList() + .blockingGet() + + ttl.set("tom") + Flowable.just("Hello") + .subscribeOn(Schedulers.computation()) + .doOnNext { + println("[${Thread.currentThread().name}] $it ${ttl.get()}") + } + .toList() + .blockingGet() +} -- GitLab