/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import sun.misc.SharedSecrets;
import sun.misc.UnsafeAccess;
import java.dyn.Coroutine;
import java.dyn.CoroutineExitException;
import java.nio.channels.SelectableChannel;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* {@link WispTask} provides high-level semantics of {link @Coroutine}
*
* Create {@link WispTask} via {@link WispEngine#dispatch(Runnable)} (Callable, String)} to make
* blocking IO operation in {@link WispTask}s to become concurrent.
*
* The creator and a newly created {@link WispTask} will automatically have parent-children relationship.
* When the child gets blocked on something, the {@link WispCarrier} will try to execute parent first.
*
* A {@link WispTask}'s exit will wake up the waiting parent.
*/
public class WispTask implements Comparable {
private final static AtomicInteger idGenerator = new AtomicInteger();
static final Map id2Task = new ConcurrentHashMap<>(360);
// global table used for all WispCarriers
static WispTask fromId(int id) {
WispCarrier carrier = WispCarrier.current();
boolean isInCritical0 = carrier.isInCritical;
carrier.isInCritical = true;
try {
return id2Task.get(id);
} finally {
carrier.isInCritical = isInCritical0;
}
}
static void cleanExitedTasks(List tasks) {
if (!tasks.isEmpty()) {
WispCarrier carrier = tasks.get(0).carrier;
boolean isInCritical0 = carrier.isInCritical;
carrier.isInCritical = true;
try {
for (WispTask t : tasks) {
id2Task.remove(t.id);
t.cleanup();
}
} finally {
carrier.isInCritical = isInCritical0;
}
}
}
static void cleanExitedTask(WispTask task) {
WispCarrier carrier = WispCarrier.current();
boolean isInCritical0 = carrier.isInCritical;
carrier.isInCritical = true;
try {
// cleanup shouldn't be executed for thread task
id2Task.remove(task.id);
} finally {
carrier.isInCritical = isInCritical0;
}
}
static void trackTask(WispTask task) {
WispCarrier carrier = WispCarrier.current();
boolean isInCritical0 = carrier.isInCritical;
carrier.isInCritical = true;
try {
id2Task.put(task.id, task);
} finally {
carrier.isInCritical = isInCritical0;
}
}
private final int id;
enum Status {
ALIVE, // ALIVE
ZOMBIE // exited
}
private Runnable runnable; // runnable for created task
/**
* Task is running in that carrier.
*/
volatile WispCarrier carrier;
private String name;
final Coroutine ctx; // the low-level coroutine implement
Status status = Status.ALIVE;
SelectableChannel ch; // the interesting channel
TimeOut timeOut; // related timer
ClassLoader ctxClassLoader;
private final boolean isThreadTask;
private boolean isThreadAsWisp;
private Thread threadWrapper; // thread returned by Thread::currentThread()
private volatile int interrupted; // 0 means not interrupted
private volatile int alreadyCheckNativeInterrupt;
private volatile int jdkParkStatus;
private volatile int jvmParkStatus;
volatile int stealLock;
private WispTask from;
/**
* WispTask execution wrapper for schduler should only be used in wakupTask
*/
final StealAwareRunnable resumeEntry;
// counter printed by jstack
private int activeCount;
int stealCount;
int stealFailureCount;
private int preemptCount;
// perf monitor
private long enqueueTime;
private long parkTime;
private long blockingTime;
private long registerEventTime;
// monolithic epoll support
private volatile long epollArray;
private volatile int epollEventNum;
int epollArraySize;
WispTask(WispCarrier carrier, Coroutine ctx, boolean isRealTask, boolean isThreadTask) {
this.isThreadTask = isThreadTask;
this.id = isRealTask ? idGenerator.addAndGet(1) : -1;
setCarrier(carrier);
if (isRealTask) {
this.ctx = ctx != null ? ctx : new CacheableCoroutine(WispConfiguration.STACK_SIZE);
this.ctx.setWispTask(id, this, carrier);
} else {
this.ctx = null;
}
resumeEntry = isThreadTask ? null : carrier.createResumeEntry(this);
}
void reset(Runnable runnable, String name, Thread thread, ClassLoader ctxLoader) {
assert ctx != null;
this.status = Status.ALIVE;
this.runnable = runnable;
this.name = name;
interrupted = 0;
ctxClassLoader = ctxLoader;
ch = null;
enqueueTime = 0;
parkTime = 0;
blockingTime = 0;
registerEventTime = 0;
activeCount = 0;
stealCount = 0;
stealFailureCount = 0;
preemptCount = 0;
// thread status
if (thread != null) { // calling from Thread.start()
NATIVE_INTERRUPTED_UPDATER.lazySet(this, 1);
isThreadAsWisp = true;
WispEngine.JLA.setWispTask(thread, this);
threadWrapper = thread;
} else {
// for WispThreadWrapper, skip native interrupt check
NATIVE_INTERRUPTED_UPDATER.lazySet(this, 0);
isThreadAsWisp = false;
if (threadWrapper == null) {
threadWrapper = new WispThreadWrapper(this);
}
WispEngine.JLA.setWispAlive(threadWrapper, true);
}
assert WispEngine.JLA.getWispTask(threadWrapper) == this;
if (!isThreadTask() && name != null && !threadWrapper.getName().equals(name)) {
threadWrapper.setName(name);
}
}
void setCarrier(WispCarrier carrier) {
CARRIER_UPDATER.lazySet(this, carrier);
}
private void cleanup() {
setCarrier(null);
threadWrapper = null;
ctxClassLoader = null;
}
class CacheableCoroutine extends Coroutine {
CacheableCoroutine(long stacksize) {
super(stacksize);
}
@Override
protected void run() {
while (true) {
assert WispCarrier.current() == carrier;
assert carrier.current == WispTask.this;
if (runnable != null) {
Throwable throwable = null;
try {
runOutsideWisp(runnable);
} catch (Throwable t) {
throwable = t;
} finally {
assert timeOut == null;
runnable = null;
WispEngine.JLA.setWispAlive(threadWrapper, false);
if (isThreadAsWisp) {
ThreadAsWisp.exit(threadWrapper);
}
if (throwable instanceof CoroutineExitException) {
throw (CoroutineExitException) throwable;
}
carrier.taskExit();
}
} else {
carrier.schedule();
}
}
}
}
/**
* Mark if wisp is running internal scheduling code or user code, this would
* be used in preempt to identify if it's okay to preempt
* Modify Coroutine::is_usermark_frame accordingly if you need to change this
* method, because it's name and sig are used
*/
private static void runOutsideWisp(Runnable runnable) {
runnable.run();
}
/**
* Switch task. we need the information of {@code from} task param
* to do classloader switch etc..
*
* {@link #stealLock} is used in {@link WispCarrier#steal(WispTask)} .
*/
static boolean switchTo(WispTask current, WispTask next) {
assert next.ctx != null;
assert WispCarrier.current() == current.carrier;
assert current.carrier == next.carrier;
next.activeCount++;
assert current.isThreadTask() || next.isThreadTask();
next.from = current;
STEAL_LOCK_UPDATER.lazySet(next, 1);
// store load barrier is not necessary
boolean res = current.carrier.thread.getCoroutineSupport().unsafeSymmetricYieldTo(next.ctx);
assert current.stealLock != 0;
STEAL_LOCK_UPDATER.lazySet(current.from, 0);
assert WispCarrier.current() == current.carrier;
assert current.carrier.current == current;
return res;
}
/**
* @return {@code false} if current {@link WispTask} is thread-emulated.
*/
boolean isThreadTask() {
return isThreadTask;
}
/**
* Let currently executing task sleep for specified number of milliseconds.
*
* May be wakened up early by an available IO.
*/
static void sleep(long ms) {
if (ms < 0) throw new IllegalArgumentException();
if (ms == 0) {
WispCarrier.current().yield();
} else {
WispCarrier.current().unregisterEvent();
jdkPark(TimeUnit.MILLISECONDS.toNanos(ms));
}
}
@Override
public String toString() {
return "WispTask" + id + "(" +
"name=" + name + ')' +
"{status=" + status + "/" +
jdkParkStatus + ", " +
'}';
}
public String getName() {
return name;
}
private static final int
WAITING = -1, // was blocked
FREE = 0, // the Initial Park status
PERMITTED = 1; // another task give a permit to make the task not block at next park()
static final String SHUTDOWN_TASK_NAME = "SHUTDOWN_TASK";
boolean isAlive() {
return status != Status.ZOMBIE;
}
/**
* If a permit is available, it will be consumed and this function returns
* immediately; otherwise
* current task will become blocked until {@link #unpark()} ()} happens.
*
* @param timeoutNano <= 0 park forever
* else park with given timeout
*/
private void parkInternal(long timeoutNano, boolean fromJvm) {
if (timeoutNano > 0 && timeoutNano < WispConfiguration.MIN_PARK_NANOS) {
carrier.yield();
return;
}
final AtomicIntegerFieldUpdater statusUpdater = fromJvm ? JVM_PARK_UPDATER : JDK_PARK_UPDATER;
final boolean isInCritical0 = carrier.isInCritical;
carrier.isInCritical = true;
try {
carrier.getCounter().incrementParkCount();
for (;;) {
int s = statusUpdater.get(this);
assert s != WAITING; // if parkStatus == WAITING, should already blocked
if (s == FREE && statusUpdater.compareAndSet(this, FREE, WAITING)) {
// may become PERMITTED here; need retry.
// another thread unpark here is ok:
// current task is put to unpark queue,
// and will wake up eventually
if (WispEngine.runningAsCoroutine(threadWrapper) && timeoutNano > 0) {
carrier.addTimer(timeoutNano + System.nanoTime(), fromJvm);
}
carrier.isInCritical = isInCritical0;
try {
if (WispEngine.runningAsCoroutine(threadWrapper)) {
setParkTime();
carrier.schedule();
} else {
UA.park0(false, timeoutNano < 0 ? 0 : timeoutNano);
}
} finally {
carrier.isInCritical = true;
if (timeoutNano > 0) {
carrier.cancelTimer();
}
// we'may direct wakeup by current carrier
// the statue may be still WAITING..
statusUpdater.lazySet(this, FREE);
}
break;
} else if (s == PERMITTED &&
(statusUpdater.compareAndSet(this, PERMITTED, FREE))) {
// consume the permit
break;
}
}
} finally {
carrier.isInCritical = isInCritical0;
}
}
/**
* If the thread was blocked on {@link #park(long)} then it will unblock.
* Otherwise, its next call to {@link #park(long)} is guaranteed not to block.
*/
private void unparkInternal(boolean fromJvm) {
AtomicIntegerFieldUpdater statusUpdater = fromJvm ? JVM_PARK_UPDATER : JDK_PARK_UPDATER;
for (;;) {
int s = statusUpdater.get(this);
if (s == WAITING && statusUpdater.compareAndSet(this, WAITING, FREE)) {
if (WispEngine.runningAsCoroutine(threadWrapper)) {
recordOnUnpark(fromJvm);
carrier.wakeupTask(this);
} else {
UA.unpark0(threadWrapper);
}
break;
} else if (s == PERMITTED ||
(s == FREE && statusUpdater.compareAndSet(this, FREE, PERMITTED))) {
// add a permit
break;
}
}
}
/**
* Park Invoked by jdk, include IO, JUC etc..
*/
static void jdkPark(long timeoutNano) {
WispCarrier.current().getCurrentTask().parkInternal(timeoutNano, false);
}
void jdkUnpark() {
unparkInternal(false);
}
/**
* Invoked by VM to support coroutine switch in object monitor case.
*/
private static void park(long timeoutNano) {
WispCarrier.current().getCurrentTask().parkInternal(timeoutNano, true);
}
void unpark() {
unparkInternal(true);
}
// direct called by jvm runtime if UseDirectUnpark
static void unparkById(int id) {
WispTask t = fromId(id);
if (t != null) {
t.unpark();
}
}
void interrupt() {
// For JSR166. Unpark even if interrupt status was already set.
interrupted = 1;
unpark();
jdkUnpark();
}
private static void interruptById(int id) {
WispTask t = fromId(id);
if (t != null) {
t.interrupt();
}
}
boolean isInterrupted() {
return interrupted != 0;
}
boolean testInterruptedAndClear(boolean clear) {
boolean nativeInterrupt = false;
if (alreadyCheckNativeInterrupt == 0 && // only do it once
NATIVE_INTERRUPTED_UPDATER.compareAndSet(this, 0, 1) &&
!isInterrupted()) {
nativeInterrupt = checkAndClearNativeInterruptForWisp(threadWrapper);
}
boolean res = interrupted != 0 || nativeInterrupt;
if (res && clear) {
INTERRUPTED_UPDATER.lazySet(this, 0);
}
// return old interrupt status.
return res;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WispTask t = (WispTask) o;
return Objects.equals(id, t.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
public Thread getThreadWrapper() {
return threadWrapper;
}
void setThreadWrapper(Thread thread) {
threadWrapper = thread;
WispEngine.JLA.setWispTask(thread, this);
}
void resetThreadWrapper() {
if (isThreadAsWisp) {
threadWrapper = null;
}
}
@Override
public int compareTo(WispTask o) {
return Integer.compare(this.id, o.id);
}
long getEpollArray() {
return epollArray;
}
void setEpollArray(long epollArray) {
EPOLL_ARRAY_UPDATER.lazySet(this, epollArray);
}
int getEpollEventNum() {
return epollEventNum;
}
void setEpollEventNum(int epollEventNum) {
EPOLL_EVENT_NUM_UPDATER.lazySet(this, epollEventNum);
}
void updateEnqueueTime() {
if (!WispConfiguration.WISP_PROFILE) {
return;
}
// in wisp2, if the task is stealed unsuccessfully, it will be put into queue again
if (enqueueTime != 0) {
return;
}
enqueueTime = System.nanoTime();
}
long getEnqueueTime() {
return enqueueTime;
}
void resetEnqueueTime() {
enqueueTime = 0;
}
void setRegisterEventTime() {
// only count the time which is spent on WispTask by service
registerEventTime = (!WispConfiguration.WISP_PROFILE || isThreadTask) ? 0 : System.nanoTime();
}
void resetRegisterEventTime() {
registerEventTime = 0;
}
void countWaitSocketIOTime() {
if (registerEventTime != 0) {
carrier.counter.incrementTotalWaitSocketIOTime(System.nanoTime() - registerEventTime);
registerEventTime = 0;
}
}
private void setParkTime() {
parkTime = (!WispConfiguration.WISP_PROFILE || isThreadTask) ? 0 : System.nanoTime();
}
/* When unpark is called, the time is set.
* Since the unpark may be called by non-worker thread, the count is delayed.
*/
private void recordOnUnpark(boolean fromJVM) {
if (!WispConfiguration.WISP_PROFILE) {
return;
}
if (parkTime != 0) {
blockingTime = System.nanoTime() - parkTime;
if (blockingTime < 0) {
blockingTime = 0;
}
parkTime = 0;
}
if (fromJVM) {
carrier.counter.incrementUnparkFromJvmCount();
}
}
void countExecutionTime(long beginTime) {
// TaskExit set beginTime to 0, and calls schedule,
// then beginTime is 0. It need to skip it.
if (!WispConfiguration.WISP_PROFILE || beginTime == 0) {
return;
}
carrier.counter.incrementTotalExecutionTime(System.nanoTime() - beginTime);
if (blockingTime != 0) {
carrier.counter.incrementTotalBlockingTime(blockingTime);
blockingTime = 0;
}
}
StackTraceElement[] getStackTrace() {
return this.ctx.getCoroutineStack();
}
private static final AtomicReferenceFieldUpdater CARRIER_UPDATER;
private static final AtomicIntegerFieldUpdater JVM_PARK_UPDATER;
private static final AtomicIntegerFieldUpdater JDK_PARK_UPDATER;
private static final AtomicIntegerFieldUpdater INTERRUPTED_UPDATER;
private static final AtomicIntegerFieldUpdater NATIVE_INTERRUPTED_UPDATER;
private static final AtomicIntegerFieldUpdater STEAL_LOCK_UPDATER;
private static final AtomicLongFieldUpdater EPOLL_ARRAY_UPDATER;
private static final AtomicIntegerFieldUpdater EPOLL_EVENT_NUM_UPDATER;
private static final UnsafeAccess UA = SharedSecrets.getUnsafeAccess();
private static native void registerNatives();
// only for wisp to clear the native interrupt, for parallel interrupt problem.
private static native boolean checkAndClearNativeInterruptForWisp(Thread cur);
static {
CARRIER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(WispTask.class, WispCarrier.class, "carrier");
JVM_PARK_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WispTask.class, "jvmParkStatus");
JDK_PARK_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WispTask.class, "jdkParkStatus");
INTERRUPTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WispTask.class, "interrupted");
NATIVE_INTERRUPTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WispTask.class, "alreadyCheckNativeInterrupt");
STEAL_LOCK_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WispTask.class, "stealLock");
EPOLL_ARRAY_UPDATER = AtomicLongFieldUpdater.newUpdater(WispTask.class, "epollArray");
EPOLL_EVENT_NUM_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WispTask.class, "epollEventNum");
registerNatives();
}
}