提交 b9145395 编写于 作者: oldratlee's avatar oldratlee 🔥

add TTL koroutine intergration demo

上级 bd3a4c2d
package com.alibaba.demo.coroutine.ttl_intergration
import com.alibaba.ttl.TransmittableThreadLocal.Transmitter.*
import com.alibaba.ttl.threadpool.agent.TtlAgent
import kotlinx.coroutines.ThreadContextElement
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
* @see [kotlinx.coroutines.asContextElement]
*/
fun ttlContext(): CoroutineContext =
// if (TtlAgent.isTtlAgentLoaded()) // FIXME Open the if when implement TtlAgent for koroutine
// EmptyCoroutineContext
// else
TtlElement()
/**
* @see [kotlinx.coroutines.internal.ThreadLocalElement]
*/
internal class TtlElement : ThreadContextElement<Any> {
companion object Key : CoroutineContext.Key<TtlElement>
override val key: CoroutineContext.Key<*> get() = Key
private var captured: Any =
capture()
override fun updateThreadContext(context: CoroutineContext): Any =
replay(captured)
override fun restoreThreadContext(context: CoroutineContext, oldState: Any) {
captured = capture() // FIXME This capture operation is a MUST, WHY? This operation is too expensive?!
restore(oldState)
}
// this method is overridden to perform value comparison (==) on key
override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
if (Key == key) EmptyCoroutineContext else this
// this method is overridden to perform value comparison (==) on key
override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (Key == key) this as E else null
}
package com.alibaba.demo.coroutine.ttl_intergration.usage
import com.alibaba.demo.coroutine.ttl_intergration.ttlContext
import com.alibaba.ttl.TransmittableThreadLocal
import kotlinx.coroutines.*
private val threadLocal = TransmittableThreadLocal<String?>() // declare thread-local variable
/**
* [Thread-local data - Coroutine Context and Dispatchers - Kotlin Programming Language](https://kotlinlang.org/docs/reference/coroutines/coroutine-context-and-dispatchers.html#thread-local-data)
*/
fun main(): Unit = runBlocking {
val block: suspend CoroutineScope.() -> Unit = {
println("Launch start, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
threadLocal.set("!reset!")
println("After reset, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
delay(5)
println("After yield, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
}
threadLocal.set("main")
println("======================\nEmpty Coroutine Context\n======================")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
launch(block = block).join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
threadLocal.set("main")
println()
println("======================\nTTL Coroutine Context\n======================")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
launch(ttlContext(), block = block).join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
threadLocal.set("main")
println()
println("======================\nDispatchers.Default Coroutine Context\n======================")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
launch(Dispatchers.Default, block = block).join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
threadLocal.set("main")
println()
println("======================\nDispatchers.Default + TTL Coroutine Context\n======================")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
launch(Dispatchers.Default + ttlContext(), block = block).join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
}
package com.alibaba.demo.coroutine.ttl_intergration.usage
import com.alibaba.demo.coroutine.ttl_intergration.ttlContext
import com.alibaba.ttl.TransmittableThreadLocal
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotEquals
import org.junit.Test
class TtlCoroutineContextTest {
@Test
fun threadContextElement_passByValue(): Unit = runBlocking {
val mainValue = "main-${System.currentTimeMillis()}"
val testThread = Thread.currentThread()
// String ThreadLocal, String is immutable value, can only be passed by value
val threadLocal = TransmittableThreadLocal<String?>()
threadLocal.set(mainValue)
println("test thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
val job = launch(Dispatchers.Default + ttlContext()) {
println("Launch start, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
delay(5)
println("After delay, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
val reset = "job-reset-${threadLocal.get()}"
threadLocal.set(reset)
assertEquals(reset, threadLocal.get())
delay(5)
println("After delay set reset, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
assertEquals(reset, threadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
}
job.join()
println("after launch, test thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
}
@Test
fun threadContextElement_passByReference(): Unit = runBlocking {
data class Reference(var data: Int = 42)
val mainValue = Reference()
val testThread = Thread.currentThread()
// Reference ThreadLocal, mutable value, pass by reference
val threadLocal = TransmittableThreadLocal<Reference>() // declare thread-local variable
threadLocal.set(mainValue)
println("test thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
val job = launch(Dispatchers.Default + ttlContext()) {
println("Launch start, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
delay(5)
println("After delay, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
val reset = -42
threadLocal.get().data = reset
delay(5)
println("After delay set reset, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
assertEquals(Reference(reset), threadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
}
job.join()
println("after launch, test thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
}
@Test
fun twoThreadContextElement(): Unit = runBlocking {
val mainValue = "main-a-${System.currentTimeMillis()}"
val anotherMainValue = "main-another-${System.currentTimeMillis()}"
val testThread = Thread.currentThread()
val threadLocal = TransmittableThreadLocal<String?>() // declare thread-local variable
val anotherThreadLocal = TransmittableThreadLocal<String?>() // declare thread-local variable
threadLocal.set(mainValue)
anotherThreadLocal.set(anotherMainValue)
println("test thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()} | ${anotherThreadLocal.get()}")
println()
launch(Dispatchers.Default + ttlContext()) {
println("Launch start, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()} | ${anotherThreadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
assertEquals(anotherMainValue, anotherThreadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
delay(5)
println("After delay, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()} | ${anotherThreadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
assertEquals(anotherMainValue, anotherThreadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
val resetA = "job-reset-${threadLocal.get()}"
threadLocal.set(resetA)
val resetAnother = "job-reset-${anotherThreadLocal.get()}"
anotherThreadLocal.set(resetAnother)
println("Before delay set reset, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()} | ${anotherThreadLocal.get()}")
delay(5)
println("After delay set reset, current thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()} | ${anotherThreadLocal.get()}")
assertEquals(resetA, threadLocal.get())
assertEquals(resetAnother, anotherThreadLocal.get())
assertNotEquals(testThread, Thread.currentThread())
}.join()
println("after launch2, test thread: ${Thread.currentThread()}, thread local value: ${threadLocal.get()} | ${anotherThreadLocal.get()}")
assertEquals(mainValue, threadLocal.get())
assertEquals(anotherMainValue, anotherThreadLocal.get())
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册