提交 5b210cf7 编写于 作者: Y yunyao.zxl 提交者: zhengxiaolinX

[Wisp] Add Wisp

Summary: Add JDK code of the Wisp implementation

Test Plan: all Wisp tests

Reviewed-by: yuleil, shiyuexw, sanhong

Issue: https://github.com/alibaba/dragonwell8/issues/113
上级 44c71bc8
......@@ -264,6 +264,11 @@ ifndef OPENJDK
$(wildcard $(JDK_TOPDIR)/src/closed/$(OPENJDK_TARGET_OS_API_DIR)/classes)
endif
LINUX_SRC_DIRS :=
ifeq ($(OPENJDK_TARGET_OS), linux)
LINUX_SRC_DIRS += $(JDK_TOPDIR)/src/linux/classes
endif
MACOSX_SRC_DIRS :=
ifeq ($(OPENJDK_TARGET_OS), macosx)
MACOSX_SRC_DIRS += $(JDK_TOPDIR)/src/macosx/classes
......@@ -328,6 +333,7 @@ $(eval $(call SetupJavaCompilation,BUILD_JDK,\
SRC:=$(JDK_TOPDIR)/src/share/classes \
$(JDK_TOPDIR)/src/$(OPENJDK_TARGET_OS_API_DIR)/classes \
$(MACOSX_SRC_DIRS) \
$(LINUX_SRC_DIRS) \
$(AIX_SRC_DIRS) \
$(JDK_OUTPUTDIR)/gensrc \
$(JDK_OUTPUTDIR)/gensrc_no_srczip \
......
......@@ -562,7 +562,8 @@ EXPORTED_PRIVATE_PKGS = com.oracle.net \
com.alibaba.management \
com.alibaba.jvm.gc \
com.alibaba.rcm \
com.alibaba.tenant
com.alibaba.tenant \
com.alibaba.wisp.engine \
$(IMAGES_OUTPUTDIR)/symbols/_the.symbols: $(IMAGES_OUTPUTDIR)/lib/rt.jar
$(RM) -r $(IMAGES_OUTPUTDIR)/symbols/META-INF/sym
......
......@@ -2565,4 +2565,7 @@ com/alibaba/tenant/TenantException
com/alibaba/tenant/TenantState
com/alibaba/tenant/TenantGlobals
com/alibaba/tenant/TenantContainerFactory
com/dyn/Coroutine
com/alibaba/wisp/engine/WispEngine
com/alibaba/wisp/engine/WispSysmon
# eea35d9d56e0006e
\ No newline at end of file
......@@ -144,6 +144,7 @@ LIBJAVA_SRC_DIRS += $(JDK_TOPDIR)/src/$(OPENJDK_TARGET_OS_API_DIR)/native/java/l
$(JDK_TOPDIR)/src/share/native/com/alibaba/jwarmup \
$(JDK_TOPDIR)/src/share/native/com/alibaba/jvm/gc \
$(JDK_TOPDIR)/src/share/native/com/alibaba/tenant \
$(JDK_TOPDIR)/src/share/native/java/dyn \
$(JDK_TOPDIR)/src/share/native/sun/misc \
$(JDK_TOPDIR)/src/share/native/sun/reflect \
$(JDK_TOPDIR)/src/share/native/java/util \
......@@ -151,6 +152,11 @@ LIBJAVA_SRC_DIRS += $(JDK_TOPDIR)/src/$(OPENJDK_TARGET_OS_API_DIR)/native/java/l
$(JDK_TOPDIR)/src/$(OPENJDK_TARGET_OS_API_DIR)/native/common \
$(JDK_TOPDIR)/src/$(OPENJDK_TARGET_OS_API_DIR)/native/java/util
# Wisp only supports Linux
ifeq ($(OPENJDK_TARGET_OS), linux)
LIBJAVA_SRC_DIRS += $(JDK_TOPDIR)/src/linux/native/com/alibaba/wisp/engine
endif
ifeq ($(OPENJDK_TARGET_OS), windows)
LIBJAVA_SRC_DIRS += $(JDK_TOPDIR)/src/$(OPENJDK_TARGET_OS_API_DIR)/native/sun/util/locale/provider
else ifeq ($(OPENJDK_TARGET_OS), macosx)
......
......@@ -221,6 +221,10 @@ SUNWprivate_1.1 {
Java_com_alibaba_jwarmup_JWarmUp_registerNatives;
Java_com_alibaba_tenant_TenantGlobals_getTenantFlags;
Java_com_alibaba_jvm_gc_ElasticHeapMXBeanImpl_registerNatives;
Java_com_alibaba_wisp_engine_WispEngine_registerNatives;
Java_com_alibaba_wisp_engine_WispSysmon_registerNatives;
Java_com_alibaba_wisp_engine_WispTask_registerNatives;
Java_java_dyn_Coroutine_registerNatives;
Java_java_lang_Throwable_fillInStackTrace;
Java_java_lang_Throwable_getStackTraceDepth;
Java_java_lang_Throwable_getStackTraceElement;
......
......@@ -13,6 +13,10 @@ text: .text%Java_java_lang_Thread_registerNatives;
text: .text%Java_com_alibaba_tenant_TenantGlobals_getTenantFlags;
text: .text%Java_com_alibaba_jwarmup_JWarmUp_registerNatives;
text: .text%Java_com_alibaba_jvm_gc_ElasticHeapMXBeanImpl_registerNatives;
text: .text%Java_com_alibaba_wisp_engine_WispEngine_registerNatives;
text: .text%Java_com_alibaba_wisp_engine_WispSysmon_registerNatives;
text: .text%Java_com_alibaba_wisp_engine_WispTask_registerNatives;
text: .text%Java_java_dyn_Coroutine_registerNatives;
text: .text%Java_java_security_AccessController_getStackAccessControlContext;
text: .text%Java_java_security_AccessController_getInheritedAccessControlContext;
text: .text%Java_java_lang_ClassLoader_registerNatives;
......
......@@ -49,6 +49,7 @@ SUNWprivate_1.1 {
Java_sun_nio_ch_EPoll_epollCreate;
Java_sun_nio_ch_EPoll_epollCtl;
Java_sun_nio_ch_EPoll_epollWait;
Java_sun_nio_ch_EPoll_errnoENOENT;
Java_sun_nio_ch_EPollPort_close0;
Java_sun_nio_ch_EPollPort_drain1;
Java_sun_nio_ch_EPollPort_interrupt;
......
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
/**
* An runnable that is aware of work stealing.
*/
public interface StealAwareRunnable extends Runnable {
/**
* @return if that runnable could be stolen
*/
default boolean isStealEnable() {
return true;
}
/**
* Set this runnable's {@link #isStealEnable} to given value
*/
default void setStealEnable(boolean b) {
}
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
class TaskDispatcher implements StealAwareRunnable {
private final ClassLoader ctxClassLoader;
private final long enqueueTime;
private final Runnable target;
private final String name;
private final Thread thread;
TaskDispatcher(ClassLoader ctxClassLoader, Runnable target, String name, Thread thread) {
this.ctxClassLoader = ctxClassLoader;
this.enqueueTime = WispEngine.getNanoTime();
this.target = target;
this.name = name;
this.thread = thread;
}
@Override
public void run() {
WispCarrier current = WispCarrier.current();
current.countEnqueueTime(enqueueTime);
current.runTaskInternal(target, name, thread,
ctxClassLoader == null ? current.current.ctxClassLoader : ctxClassLoader);
}
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import java.util.stream.Stream;
/**
* Convert Thread.start() to Wisp without changing application's code.
* <p/>
* Note that ThreadAsWisp is not wisp core logic, just a wrapper of wisp
* coroutine create API which is used by jdk threading library, so we can
* use objectMonitor here.
*/
class ThreadAsWisp {
private final static String JAVA_LANG_PKG = "package:java.lang";
private static int nonDaemonCount;
private static Thread preventShutdownThread;
/**
* Try to "start thread" as wisp if all listed condition is satisfied:
* <p>
* 1. not in java.lang or blacklisted package/class
* 2. not a WispEngine internal Thread
* 3. allThreadAsWisp is true and not match the blacklist
*
* @param thread the thread
* @param target thread's target field
* @return if condition is satisfied and thread is started as wisp
*/
static boolean tryStart(Thread thread, Runnable target) {
if (!WispConfiguration.ALL_THREAD_AS_WISP
|| WispEngine.isEngineThread(thread)
|| matchBlackList(thread, target)) {
return false;
}
if (!thread.isDaemon()) {
synchronized (ThreadAsWisp.class) {
if (nonDaemonCount++ == 0) { // start a non-daemon thread to prevent jvm exit
assert preventShutdownThread == null;
preventShutdownThread = new PreventShutdownThread();
preventShutdownThread.start();
}
}
}
// pthread_create always return before new thread started, so we should not wait here
WispEngine.JLA.setWispAlive(thread, true); // thread.isAlive() should be true
WispEngine.current().startAsThread(thread, thread.getName(), thread);
return true;
}
/**
* Refer to hotspot JavaThread::exit():
* <p>
* 1. Call Thread.exit() to clean up threadGroup
* 2. Notify threads wait on Thread.join()
* 3. exit jvm if all non-daemon thread is exited
*
* @param thread exited thread
*/
static void exit(Thread thread) {
WispEngine.JLA.threadExit(thread);
synchronized (thread) {
thread.notifyAll();
}
if (!thread.isDaemon()) {
synchronized (ThreadAsWisp.class) {
if (--nonDaemonCount == 0) {
assert preventShutdownThread != null && !preventShutdownThread.isInterrupted();
preventShutdownThread.interrupt();
preventShutdownThread = null;
}
}
}
}
private static boolean matchBlackList(Thread thread, Runnable target) {
return Stream.concat(Stream.of(JAVA_LANG_PKG), WispConfiguration.getThreadAsWispBlacklist().stream())
.anyMatch(s -> {
Class<?> clazz = (target == null ? thread : target).getClass();
// Java code could start a thread by passing a `target` Runnable argument
// or override Thread.run() method;
// if `target` is null, we're processing the override situation, so we need
// to check the thread object's class.
String[] sp = s.split(":");
if (sp.length != 2) {
return false;
}
switch (sp[0]) {
case "class":
return sp[1].equals(clazz.getName());
case "package":
Package pkg = clazz.getPackage();
String pkgName = pkg == null ? "" : pkg.getName();
return sp[1].equals(pkgName);
case "name":
return wildCardMatch(thread.getName(), sp[1]);
default:
return false;
}
});
}
private static boolean wildCardMatch(String s, String p) {
return s.matches(p.replace("?", ".?").replace("*", ".*"));
}
private static class PreventShutdownThread extends Thread {
// help us monitor if PreventShutdownThread start/exit too much times.
// startCount should very close to 1 for most applications
private static int startCount;
private PreventShutdownThread() {
super(WispEngine.DAEMON_THREAD_GROUP, "Wisp-Prevent-Shutdown-" + startCount++);
setDaemon(false); // the daemon attribute is inherited from parent, set to false explicitly
}
@Override
public synchronized void run() {
while (true) {
try {
wait();
} catch (InterruptedException e) {
break;
}
}
}
} // class PreventShutdownThread
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
/**
* Represents {@link WispTask} related time out
*/
public class TimeOut {
final WispTask task;
long deadlineNano;
boolean canceled = false;
/**
* its position in the heapArray, -1 indicates that TimeOut has been deleted
*/
private final boolean fromJvm;
private int queueIdx;
private TimerManager manager;
/**
* @param task related {@link WispTask}
* @param deadlineNano wake up related {@link WispTask} at {@code deadline} if not canceled
*/
public TimeOut(WispTask task, long deadlineNano, boolean fromJvm) {
this.task = task;
this.deadlineNano = deadlineNano;
this.fromJvm = fromJvm;
}
/**
* @return {@code true} if and only if associated timer is expired.
*/
public boolean expired() {
return !canceled && System.nanoTime() >= deadlineNano;
}
/**
* unpark the blocked task
*/
void doUnpark() {
if (fromJvm) {
task.unpark();
} else {
task.jdkUnpark();
}
}
/**
* We use minimum heap algorithm to get the minimum deadline of all timers,
* also keep every TimeOut's queueIdx(its position in heap) so that we can easily
* remove it.
*/
static class TimerManager {
Queue queue = new Queue();
ConcurrentLinkedQueue<TimeOut> rmQ = new ConcurrentLinkedQueue<>();
void copyTimer(Queue copiedQueue) {
copiedQueue.size = 0;
for (TimeOut timeOut : copiedQueue.queue) {
if (timeOut != null) {
addTimer(timeOut);
}
}
}
void addTimer(TimeOut timeOut) {
timeOut.deadlineNano = overflowFree(timeOut.deadlineNano, queue.peek());
timeOut.manager = this;
queue.offer(timeOut);
}
void cancelTimer(TimeOut timeOut) {
if (timeOut.queueIdx != -1) {
if (timeOut.manager == this) {
queue.remove(timeOut);
} else {
timeOut.manager.rmQ.add(timeOut);
}
}
}
/**
* Dispatch timeout events and return the timeout deadline for next
* first timeout task
*
* @return -1: there's no timeout task, > 0 deadline nanos
*/
long processTimeoutEventsAndGetWaitDeadline(final long now) {
TimeOut timeOut;
while ((timeOut = rmQ.poll()) != null) {
assert timeOut.manager == this;
queue.remove(timeOut);
}
long deadline = -1;
if (queue.size != 0) {
while ((timeOut = queue.peek()) != null) {
if (timeOut.canceled) {
queue.poll();
} else if (timeOut.deadlineNano <= now) {
queue.poll();
timeOut.doUnpark();
} else {
deadline = timeOut.deadlineNano;
break;
}
}
}
return deadline;
}
static class Queue {
private static final int INITIAL_CAPACITY = 16;
private TimeOut[] queue = new TimeOut[INITIAL_CAPACITY];
private int size = 0;
/**
* Inserts TimeOut x at position k, maintaining heap invariant by
* promoting x up the tree until it is greater than or equal to
* its parent, or is the root.
*
* @param k the position to fill
* @param timeOut the TimeOut to insert
*/
private void siftUp(int k, TimeOut timeOut) {
while (k > 0) {
int parent = (k - 1) >>> 1;
TimeOut e = queue[parent];
if (timeOut.deadlineNano >= e.deadlineNano) {
break;
}
queue[k] = e;
e.queueIdx = k;
k = parent;
}
queue[k] = timeOut;
timeOut.queueIdx = k;
}
/**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf.
*
* @param k the position to fill
* @param timeOut the item to insert
*/
private void siftDown(int k, TimeOut timeOut) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
TimeOut c = queue[child];
int right = child + 1;
if (right < size && c.deadlineNano > queue[right].deadlineNano) {
c = queue[child = right];
}
if (timeOut.deadlineNano <= c.deadlineNano) {
break;
}
queue[k] = c;
c.queueIdx = k;
k = child;
}
queue[k] = timeOut;
timeOut.queueIdx = k;
}
public boolean remove(TimeOut timeOut) {
int i = timeOut.queueIdx;
if (i == -1) {
//this timeOut has been deleted.
return false;
}
queue[i].queueIdx = -1;
int s = --size;
TimeOut replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement) {
siftUp(i, replacement);
}
}
return true;
}
public int size() {
return size;
}
public boolean offer(TimeOut timeOut) {
int i = size++;
if (i >= queue.length) {
queue = Arrays.copyOf(queue, queue.length * 2);
}
if (i == 0) {
queue[0] = timeOut;
timeOut.queueIdx = 0;
} else {
siftUp(i, timeOut);
}
return true;
}
public TimeOut poll() {
if (size == 0) {
return null;
}
TimeOut f = queue[0];
int s = --size;
TimeOut x = queue[s];
queue[s] = null;
if (s != 0) {
siftDown(0, x);
}
f.queueIdx = -1;
return f;
}
public TimeOut peek() {
return queue[0];
}
}
}
private static long overflowFree(long deadlineNano, TimeOut head) {
if (deadlineNano < Long.MIN_VALUE / 2) { // deadlineNano regarded as negative overflow
deadlineNano = Long.MAX_VALUE;
}
if (head != null && head.deadlineNano < 0 && deadlineNano > 0 && deadlineNano - head.deadlineNano < 0) {
deadlineNano = Long.MAX_VALUE + head.deadlineNano;
// then deadlineNano - head.deadlineNano = Long.MAX_VALUE > 0
// i.e. deadlineNano > head.deadlineNano
}
return deadlineNano;
}
static long nanos2Millis(long nanos) {
long ms = TimeUnit.NANOSECONDS.toMillis(nanos + TimeUnit.MILLISECONDS.toNanos(1) / 2);
if (ms < 0) {
ms = 0;
}
if (WispConfiguration.PARK_ONE_MS_AT_LEAST && ms == 0) {
ms = 1;
}
return ms;
}
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import sun.security.action.GetPropertyAction;
import java.io.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class WispConfiguration {
private static final String DELIMITER = ";";
static final boolean TRANSPARENT_WISP_SWITCH;
static final boolean ENABLE_THREAD_AS_WISP;
static final boolean ALL_THREAD_AS_WISP;
static final int STACK_SIZE;
static final boolean PARK_ONE_MS_AT_LEAST;
static final int WORKER_COUNT;
static final boolean ENABLE_HANDOFF;
static final WispSysmon.Policy HANDOFF_POLICY;
static final int SYSMON_TICK_US;
static final int MIN_PARK_NANOS;
static final int POLLER_SHARDING_SIZE;
static final int SYSMON_CARRIER_GROW_TICK_US;
// monitor
static final boolean WISP_PROFILE;
static final boolean WISP_PROFILE_LOG_ENABLED;
static final int WISP_PROFILE_LOG_INTERVAL_MS;
static final String WISP_PROFILE_LOG_PATH;
static final boolean WISP_HIGH_PRECISION_TIMER;
static final int WISP_ENGINE_TASK_CACHE_SIZE;
static final int WISP_SCHEDULE_STEAL_RETRY;
static final int WISP_SCHEDULE_PUSH_RETRY;
static final int WISP_SCHEDULE_HELP_STEAL_RETRY;
static final WispScheduler.SchedulingPolicy SCHEDULING_POLICY;
static final boolean USE_DIRECT_SELECTOR_WAKEUP;
static final boolean CARRIER_AS_POLLER;
static final boolean MONOLITHIC_POLL;
static final boolean CARRIER_GROW;
// io
static final boolean WISP_ENABLE_SOCKET_LOCK;
private static List<String> THREAD_AS_WISP_BLACKLIST;
static {
Properties p = java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<Properties>() {
public Properties run() {
return System.getProperties();
}
}
);
TRANSPARENT_WISP_SWITCH = p.containsKey("com.alibaba.wisp.transparentWispSwitch") ?
parseBooleanParameter(p, "com.alibaba.wisp.transparentWispSwitch", false) :
parseBooleanParameter(p, "com.alibaba.transparentAsync", false);
ENABLE_THREAD_AS_WISP = p.containsKey("com.alibaba.wisp.enableThreadAsWisp") ?
parseBooleanParameter(p, "com.alibaba.wisp.enableThreadAsWisp", false) :
parseBooleanParameter(p, "com.alibaba.shiftThreadModel", false);
ALL_THREAD_AS_WISP = parseBooleanParameter(p, "com.alibaba.wisp.allThreadAsWisp", false);
STACK_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.stacksize", 512 * 1024);
PARK_ONE_MS_AT_LEAST = parseBooleanParameter(p, "com.alibaba.wisp.parkOneMs", true);
WORKER_COUNT = parsePositiveIntegerParameter(p, "com.alibaba.wisp.carrierEngines",
Runtime.getRuntime().availableProcessors());
POLLER_SHARDING_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.pollerShardingSize", 8);
ENABLE_HANDOFF = parseBooleanParameter(p, "com.alibaba.wisp.enableHandOff",
TRANSPARENT_WISP_SWITCH);
// handoff worker thread implementation is not stable enough,
// use preempt by default, and we'll move to ADAPTIVE in the future
HANDOFF_POLICY = WispSysmon.Policy.valueOf(
p.getProperty("com.alibaba.wisp.handoffPolicy", WispSysmon.Policy.PREEMPT.name()));
SYSMON_TICK_US = parsePositiveIntegerParameter(p, "com.alibaba.wisp.sysmonTickUs",
(int) TimeUnit.MILLISECONDS.toMicros(100));
MIN_PARK_NANOS = parsePositiveIntegerParameter(p, "com.alibaba.wisp.minParkNanos", 100);
WISP_PROFILE_LOG_ENABLED = parseBooleanParameter(p, "com.alibaba.wisp.enableProfileLog", false);
WISP_PROFILE_LOG_INTERVAL_MS = parsePositiveIntegerParameter(p, "com.alibaba.wisp.logTimeInternalMillis", 15000);
if (WISP_PROFILE_LOG_ENABLED) {
WISP_PROFILE = true;
WISP_PROFILE_LOG_PATH = p.getProperty("com.alibaba.wisp.logPath");
} else {
WISP_PROFILE = parseBooleanParameter(p, "com.alibaba.wisp.profile", false);
WISP_PROFILE_LOG_PATH = "";
}
CARRIER_AS_POLLER = parseBooleanParameter(p, "com.alibaba.wisp.useCarrierAsPoller", ALL_THREAD_AS_WISP);
MONOLITHIC_POLL = parseBooleanParameter(p, "com.alibaba.wisp.monolithicPoll", true);
WISP_HIGH_PRECISION_TIMER = parseBooleanParameter(p, "com.alibaba.wisp.highPrecisionTimer", false);
WISP_ENGINE_TASK_CACHE_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.engineTaskCache", 20);
WISP_SCHEDULE_STEAL_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.stealRetry", Math.max(1, WORKER_COUNT / 2));
WISP_SCHEDULE_PUSH_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.pushRetry", WORKER_COUNT);
WISP_SCHEDULE_HELP_STEAL_RETRY = parsePositiveIntegerParameter(p, "com.alibaba.wisp.schedule.helpStealRetry", Math.max(1, WORKER_COUNT / 4));
SCHEDULING_POLICY = WispScheduler.SchedulingPolicy.valueOf(p.getProperty("com.alibaba.wisp.schedule.policy",
WORKER_COUNT > 16 ? WispScheduler.SchedulingPolicy.PUSH.name() : WispScheduler.SchedulingPolicy.PULL.name()));
USE_DIRECT_SELECTOR_WAKEUP = parseBooleanParameter(p, "com.alibaba.wisp.directSelectorWakeup", true);
WISP_ENABLE_SOCKET_LOCK = parseBooleanParameter(p, "com.alibaba.wisp.useSocketLock", true);
CARRIER_GROW = parseBooleanParameter(p, "com.alibaba.wisp.growCarrier", false);
SYSMON_CARRIER_GROW_TICK_US = parsePositiveIntegerParameter(p, "com.alibaba.wisp.growCarrierTickUs", (int) TimeUnit.SECONDS.toMicros(5));
checkCompatibility();
}
private static void checkCompatibility() {
checkDependency(ENABLE_THREAD_AS_WISP, "-Dcom.alibaba.wisp.enableThreadAsWisp=true",
TRANSPARENT_WISP_SWITCH, "-Dcom.alibaba.wisp.transparentWispSwitch=true");
checkDependency(ENABLE_HANDOFF, "-Dcom.alibaba.wisp.enableHandOff=true",
TRANSPARENT_WISP_SWITCH, "-Dcom.alibaba.wisp.enableThreadAsWisp=true");
checkDependency(ALL_THREAD_AS_WISP, "-Dcom.alibaba.wisp.allThreadAsWisp=true",
ENABLE_THREAD_AS_WISP, "-Dcom.alibaba.wisp.enableThreadAsWisp=true");
checkDependency(CARRIER_AS_POLLER, "-Dcom.alibaba.wisp.useCarrierAsPoller=true",
ALL_THREAD_AS_WISP, "-Dcom.alibaba.wisp.allThreadAsWisp=true");
if (ENABLE_THREAD_AS_WISP && !ALL_THREAD_AS_WISP) {
throw new IllegalArgumentException("shift thread model by stack configuration is no longer supported," +
" use -XX:+UseWisp2 instead");
}
}
private static void checkDependency(boolean cond, String condStr, boolean preRequire, String preRequireStr) {
if (cond && !preRequire) {
throw new IllegalArgumentException("\"" + condStr + "\" depends on \"" + preRequireStr + "\"");
}
}
private static int parsePositiveIntegerParameter(Properties p, String key, int defaultVal) {
String value;
if (p == null || (value = p.getProperty(key)) == null) {
return defaultVal;
}
int res = defaultVal;
try {
res = Integer.valueOf(value);
} catch (NumberFormatException e) {
return defaultVal;
}
return res <= 0 ? defaultVal : res;
}
private static boolean parseBooleanParameter(Properties p, String key, boolean defaultVal) {
String value;
if (p == null || (value = p.getProperty(key)) == null) {
return defaultVal;
}
return Boolean.valueOf(value);
}
private static List<String> parseListParameter(Properties p, Properties confProp, String key) {
String value = p.getProperty(key);
if (value == null) {
value = confProp.getProperty(key);
}
return value == null ? Collections.emptyList() :
Arrays.asList(value.trim().split(DELIMITER));
}
/**
* Loading config from system property "com.alibaba.wisp.config" specified
* file or jre/lib/wisp.properties.
*/
private static void loadBizConfig() {
Properties p = java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<Properties>() {
public Properties run() {
return System.getProperties();
}
}
);
String path = p.getProperty("com.alibaba.wisp.config");
Properties confProp = new Properties();
if (path != null) {
File f = new File(path);
if (f.exists()) {
try (InputStream is = new BufferedInputStream(new FileInputStream(f.getPath()))) {
confProp.load(is);
} catch (IOException e) {
// ignore, all STACK_LIST are empty
}
}
}
THREAD_AS_WISP_BLACKLIST = parseListParameter(p, confProp, "com.alibaba.wisp.threadAsWisp.black");
}
private static final int UNLOADED = 0, LOADING = 1, LOADED = 2;
private static final AtomicInteger bizLoadStatus = new AtomicInteger(UNLOADED);
private static void ensureBizConfigLoaded() {
if (bizLoadStatus.get() == LOADED) {
return;
}
if (bizLoadStatus.get() == UNLOADED && bizLoadStatus.compareAndSet(UNLOADED, LOADING)) {
try {
loadBizConfig();
} finally {
bizLoadStatus.set(LOADED);
}
}
while (bizLoadStatus.get() != LOADED) {/* wait */}
}
static List<String> getThreadAsWispBlacklist() {
ensureBizConfigLoaded();
assert THREAD_AS_WISP_BLACKLIST != null;
return THREAD_AS_WISP_BLACKLIST;
}
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import java.beans.ConstructorProperties;
final public class WispCounter {
private long switchCount = 0;
private long waitTimeTotal = 0;
private long runningTimeTotal = 0;
private long completedTaskCount = 0;
private long createTaskCount = 0;
private long parkCount = 0;
private long unparkCount = 0;
private long unparkInterruptSelectorCount = 0;
private long selectableIOCount = 0;
private long timeOutCount = 0;
private long eventLoopCount = 0;
private long totalEnqueueTime = 0;
private long maxEnqueueTime = 0;
private long enqueueCount = 0;
private long totalExecutionTime = 0;
private long maxExecutionTime = 0;
private long executionCount = 0;
private long totalWaitSocketIOTime = 0;
private long maxWaitSocketIOTime = 0;
private long waitSocketIOCount = 0;
private long totalBlockingTime = 0;
private long maxBlockingTime = 0;
private long unparkFromJvmCount = 0;
private long runningTaskCount = 0;
private long taskQueueLength = 0;
WispCarrier carrier;
private WispCounter(WispCarrier carrier) {
this.carrier = carrier;
}
boolean getRunningState() {
WispCarrier e = carrier;
return e != null && e.isRunning();
}
void incrementSwitchCount() {
switchCount++;
}
long getSwitchCount() {
return switchCount;
}
void incrementCompleteTaskCount() {
completedTaskCount++;
}
void incrementRunningTimeTotal(long value) {
runningTimeTotal += value;
}
long getRunningTimeTotal() {
return runningTimeTotal;
}
void incrementWaitTime(long value) {
waitTimeTotal += value;
}
long getWaitTimeTotal() {
return waitTimeTotal;
}
public long getCompletedTaskCount() {
return completedTaskCount;
}
void incrementCreateTaskCount() {
createTaskCount++;
}
long getCreateTaskCount() {
return createTaskCount;
}
void incrementParkCount() {
parkCount++;
}
long getParkCount() {
return parkCount;
}
void incrementUnparkInterruptSelectorCount() {
unparkInterruptSelectorCount++;
}
long getUnparkInterruptSelectorCount() {
return unparkInterruptSelectorCount;
}
void incrementSelectableIOCount() {
selectableIOCount++;
}
long getSelectableIOCount() {
return selectableIOCount;
}
void incrementTimeOutCount() {
timeOutCount++;
}
long getTimeOutCount() {
return timeOutCount;
}
void incrementEventLoopCount() {
eventLoopCount++;
}
long getEventLoopCount() {
return eventLoopCount;
}
void incrementTotalEnqueueTime(long value) {
totalEnqueueTime += value;
enqueueCount++;
if (value > maxEnqueueTime) {
maxEnqueueTime = value;
}
}
public long getTotalEnqueueTime() {
return totalEnqueueTime;
}
public long getEnqueueCount() {
return enqueueCount;
}
void incrementTotalExecutionTime(long value) {
totalExecutionTime += value;
executionCount++;
if (value > maxExecutionTime) {
maxExecutionTime = value;
}
}
public long getTotalExecutionTime() {
return totalExecutionTime;
}
public long getExecutionCount() {
return executionCount;
}
void incrementTotalWaitSocketIOTime(long value) {
totalWaitSocketIOTime += value;
waitSocketIOCount++;
if (value > maxWaitSocketIOTime) {
maxWaitSocketIOTime = value;
}
}
public long getTotalWaitSocketIOTime() {
return totalWaitSocketIOTime;
}
public long getWaitSocketIOCount() {
return waitSocketIOCount;
}
void incrementTotalBlockingTime(long value) {
totalBlockingTime += value;
unparkCount++;
if (value > maxBlockingTime) {
maxBlockingTime = value;
}
}
public long getTotalBlockingTime() {
return totalBlockingTime;
}
public long getUnparkCount() {
return unparkCount;
}
long getCurrentTaskQueueLength() {
WispCarrier e = carrier;
return e != null ? e.getTaskQueueLength() : 0;
}
long getCurrentRunningTaskCount() {
WispCarrier e = carrier;
return e != null ? e.getRunningTaskCount() : 0;
}
void incrementUnparkFromJvmCount() {
unparkFromJvmCount++;
}
long getUnparkFromJvmCount() {
return unparkFromJvmCount;
}
public long getMaxEnqueueTime() {
return maxEnqueueTime;
}
public long getMaxExecutionTime() {
return maxExecutionTime;
}
public long getMaxWaitSocketIOTime() {
return maxWaitSocketIOTime;
}
public long getMaxBlockingTime() {
return maxBlockingTime;
}
public long getTaskQueueLength() {
return taskQueueLength;
}
public long getRunningTaskCount() {
return runningTaskCount;
}
WispCounter() {
}
@ConstructorProperties({"completedTaskCount", "totalEnqueueTime", "maxEnqueueTime", "enqueueCount",
"totalExecutionTime", "maxExecutionTime", "executionCount",
"totalWaitSocketIOTime", "maxWaitSocketIOTime", "waitSocketIOCount",
"totalBlockingTime", "maxBlockingTime", "unparkCount",
"runningTaskCount", "taskQueueLength"})
public WispCounter(long completedTaskCount, long totalEnqueueTime, long maxEnqueueTime, long enqueueCount,
long totalExecutionTime, long maxExecutionTime, long executionCount,
long totalWaitSocketIOTime, long maxWaitSocketIOTime, long waitSocketIOCount,
long totalBlockingTime, long maxBlockingTime, long unparkCount,
long runningTaskCount, long taskQueueLength) {
this.completedTaskCount = completedTaskCount;
this.totalEnqueueTime = totalEnqueueTime;
this.maxEnqueueTime = maxEnqueueTime;
this.enqueueCount = enqueueCount;
this.totalExecutionTime = totalExecutionTime;
this.maxExecutionTime = maxExecutionTime;
this.executionCount = executionCount;
this.totalWaitSocketIOTime = totalWaitSocketIOTime;
this.maxWaitSocketIOTime = maxWaitSocketIOTime;
this.waitSocketIOCount = waitSocketIOCount;
this.totalBlockingTime = totalBlockingTime;
this.maxBlockingTime = maxBlockingTime;
this.unparkCount = unparkCount;
this.runningTaskCount = runningTaskCount;
this.taskQueueLength = taskQueueLength;
}
void assign(WispCounter counter) {
createTaskCount = counter.createTaskCount;
completedTaskCount = counter.completedTaskCount;
totalEnqueueTime = counter.totalEnqueueTime;
enqueueCount = counter.enqueueCount;
maxEnqueueTime = counter.maxEnqueueTime;
totalExecutionTime = counter.totalExecutionTime;
executionCount = counter.executionCount;
maxExecutionTime = counter.maxExecutionTime;
totalBlockingTime = counter.totalBlockingTime;
unparkCount = counter.unparkCount;
maxBlockingTime = counter.maxBlockingTime;
totalWaitSocketIOTime = counter.totalWaitSocketIOTime;
waitSocketIOCount = counter.waitSocketIOCount;
maxWaitSocketIOTime = counter.maxWaitSocketIOTime;
switchCount = counter.switchCount;
unparkFromJvmCount = counter.unparkFromJvmCount;
runningTaskCount = counter.getCurrentRunningTaskCount();
taskQueueLength = counter.getCurrentTaskQueueLength();
}
void resetMaxValue() {
maxEnqueueTime = 0;
maxExecutionTime = 0;
maxWaitSocketIOTime = 0;
maxBlockingTime = 0;
}
void cleanup() {
carrier = null;
}
static WispCounter create(WispCarrier carrier) {
return new WispCounter(carrier);
}
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import com.alibaba.management.WispCounterMXBean;
import sun.management.Util;
import javax.management.ObjectName;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/**
* Implementation class for WispCounterMXBean.
*/
public class WispCounterMXBeanImpl implements WispCounterMXBean {
private final static String WISP_COUNTER_MXBEAN_NAME = "com.alibaba.management:type=WispCounter";
private final static Map<Long, WispCounter> managedEngineCounters;
static {
if (!WispEngine.transparentWispSwitch()) {
managedEngineCounters = new HashMap<>();
} else {
managedEngineCounters = new ConcurrentHashMap<>(100);
}
}
@Override
public List<Boolean> getRunningStates() {
return aggregate(WispCounter::getRunningState);
}
@Override
public List<Long> getSwitchCount() {
return aggregate(WispCounter::getSwitchCount);
}
@Override
public List<Long> getWaitTimeTotal() {
return aggregate(WispCounter::getWaitTimeTotal);
}
@Override
public List<Long> getRunningTimeTotal() {
return aggregate(WispCounter::getRunningTimeTotal);
}
@Override
public List<Long> getCompleteTaskCount() {
return aggregate(WispCounter::getCompletedTaskCount);
}
@Override
public List<Long> getCreateTaskCount() {
return aggregate(WispCounter::getCreateTaskCount);
}
@Override
public List<Long> getParkCount() {
return aggregate(WispCounter::getParkCount);
}
@Override
public List<Long> getUnparkCount() {
return aggregate(WispCounter::getUnparkCount);
}
@Override
public List<Long> getLazyUnparkCount() {
return aggregate(w -> 0L);
}
@Override
public List<Long> getUnparkInterruptSelectorCount() {
return aggregate(WispCounter::getUnparkInterruptSelectorCount);
}
@Override
public List<Long> getSelectableIOCount() {
return aggregate(WispCounter::getSelectableIOCount);
}
@Override
public List<Long> getTimeOutCount() {
return aggregate(WispCounter::getTimeOutCount);
}
@Override
public List<Long> getEventLoopCount() {
return aggregate(WispCounter::getEventLoopCount);
}
@Override
public List<Long> getQueueLength() {
return aggregate(WispCounter::getCurrentTaskQueueLength);
}
@Override
public List<Long> getNumberOfRunningTasks() {
return aggregate(WispCounter::getCurrentRunningTaskCount);
}
@Override
public List<Long> getTotalEnqueueTime() {
return aggregate(WispCounter::getTotalEnqueueTime);
}
@Override
public List<Long> getEnqueueCount() {
return aggregate(WispCounter::getEnqueueCount);
}
@Override
public List<Long> getTotalExecutionTime() {
return aggregate(WispCounter::getTotalExecutionTime);
}
@Override
public List<Long> getExecutionCount() {
return aggregate(WispCounter::getExecutionCount);
}
@Override
public List<Long> getTotalWaitSocketIOTime() {
return aggregate(WispCounter::getTotalWaitSocketIOTime);
}
@Override
public List<Long> getWaitSocketIOCount() {
return aggregate(WispCounter::getWaitSocketIOCount);
}
@Override
public List<Long> getTotalBlockingTime() {
return aggregate(WispCounter::getTotalBlockingTime);
}
/**
* @param id WispCarrier id
* @return WispCounter
*/
@Override
public WispCounter getWispCounter(long id) {
return WispEngine.getWispCounter(id);
}
private <T> List<T> aggregate(Function<WispCounter, T> getter) {
List<T> result = new ArrayList<>(managedEngineCounters.size());
for (Entry<Long, WispCounter> entry : managedEngineCounters.entrySet()) {
result.add(getter.apply(entry.getValue()));
}
return result;
}
@Override
public ObjectName getObjectName() {
return Util.newObjectName(WISP_COUNTER_MXBEAN_NAME);
}
static void register(WispCounter counter) {
managedEngineCounters.put(counter.carrier.getId(), counter);
}
static void deRegister(WispCounter counter) {
managedEngineCounters.remove(counter.carrier.getId());
counter.cleanup();
}
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import sun.misc.SharedSecrets;
import sun.nio.ch.IOEventAccess;
import sun.nio.ch.Net;
import sun.nio.ch.SelChImpl;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
class WispEventPump {
private static final int LOW_FD_BOUND = 1024 * 10;
private static final int MAX_EVENTS_TO_POLL = 512;
private static final IOEventAccess IOEA;
private final int epfd;
private final int pipe0;
private final int pipe1;
private final long epollArray;
static {
sun.nio.ch.IOUtil.load();
IOEA = SharedSecrets.getIOEventAccess();
}
private WispEventPump() {
try {
epfd = IOEA.eventCreate();
int[] a = new int[2];
IOEA.socketpair(a);
pipe0 = a[0];
pipe1 = a[1];
if (IOEA.eventCtl(epfd, IOEA.eventCtlAdd(), pipe0, Net.POLLIN) != 0) {
throw new IOException("epoll_ctl fail");
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
epollArray = IOEA.allocatePollArray(MAX_EVENTS_TO_POLL);
}
enum Pool {
INSTANCE;
private final int mask;
private final WispEventPump[] pumps;
Pool() {
int n = Math.max(1, WispConfiguration.WORKER_COUNT / WispConfiguration.POLLER_SHARDING_SIZE);
n = (n & (n - 1)) == 0 ? n : Integer.highestOneBit(n) * 2; // next power of 2
mask = n - 1;
pumps = new WispEventPump[n];
for (int i = 0; i < pumps.length; i++) {
pumps[i] = new WispEventPump();
}
}
void startPollerThreads() {
int i = 0;
for (WispEventPump pump : pumps) {
Thread t = new Thread(WispEngine.DAEMON_THREAD_GROUP, new Runnable() {
@Override
public void run() {
while (true) {
pump.pollAndDispatchEvents(-1);
}
}
}, "Wisp-Poller-" + i++);
t.setDaemon(true);
t.start();
}
}
private static int hash(int x) {
// implementation of Knuth multiplicative algorithm.
return x * (int) 2654435761L;
}
void registerEvent(WispTask task, SelectableChannel ch, int event) throws IOException {
if (ch != null && ch.isOpen()) {
int fd = ((SelChImpl) ch).getFDVal();
pumps[hash(fd) & mask].registerEvent(task, fd, event);
}
}
int epollWaitForWisp(int epfd, long pollArray, int arraySize, long timeout, AtomicReference<Object> status,
final Object INTERRUPTED) throws IOException {
return pumps[hash(epfd) & mask].epollWaitForWisp(epfd, pollArray, arraySize, timeout, status, INTERRUPTED);
}
void interruptEpoll(AtomicReference<Object> status, Object INTERRUPTED, int interruptFd) {
WispEventPump.interruptEpoll(status, INTERRUPTED, interruptFd);
}
WispEventPump getPump(int ord) {
return pumps[ord & mask];
}
}
/**
* fd2ReadTask handles all incoming io events like reading and accepting
*/
private final WispTask[] fd2ReadTaskLow = new WispTask[LOW_FD_BOUND];
private final ConcurrentHashMap<Integer, WispTask> fd2ReadTaskHigh = new ConcurrentHashMap<>();
/**
* fd2WriteTask handles all outing io events like connecting and writing
*/
private final WispTask[] fd2WriteTaskLow = new WispTask[LOW_FD_BOUND];
private final ConcurrentHashMap<Integer, WispTask> fd2WriteTaskHigh = new ConcurrentHashMap<>();
/**
* whether event is a reading event or an accepting event
*/
private boolean isReadEvent(int events) throws IllegalArgumentException {
int event = (events & (Net.POLLCONN | Net.POLLIN | Net.POLLOUT));
assert Integer.bitCount(event) == 1;
return (events & Net.POLLIN) != 0;
}
private WispTask[] getFd2TaskLow(int events) {
return isReadEvent(events) ? fd2ReadTaskLow : fd2WriteTaskLow;
}
private ConcurrentHashMap<Integer, WispTask> getFd2TaskHigh(int events) {
return isReadEvent(events) ? fd2ReadTaskHigh : fd2WriteTaskHigh;
}
private boolean sanityCheck(int fd, WispTask newTask, int events) {
WispTask oldTask = fd < LOW_FD_BOUND ? getFd2TaskLow(events)[fd] : getFd2TaskHigh(events).get(fd);
// If timeout happened, when oldTask finished,
// the oldTask.ch would be nullified(we didn't get chance to remove it)
return (oldTask == null || oldTask == newTask || oldTask.ch == null
|| ((SelChImpl) oldTask.ch).getFDVal() != fd);
}
/**
* All events are guaranteed to be interested in only one direction since
* all registrations are from WispSocket
*/
private void recordTaskByFD(int fd, WispTask task, int events) {
assert sanityCheck(fd, task, events);
if (fd < LOW_FD_BOUND) {
getFd2TaskLow(events)[fd] = task;
} else {
getFd2TaskHigh(events).put(fd, task);
}
}
private WispTask removeTaskByFD(int fd, int events) {
WispTask task;
if (fd < LOW_FD_BOUND) {
WispTask[] fd2TaskLow = getFd2TaskLow(events);
task = fd2TaskLow[fd];
fd2TaskLow[fd] = null;
} else {
task = getFd2TaskHigh(events).remove(fd);
}
return task;
}
private void registerEvent(WispTask task, int fd, int event) throws IOException {
int ev = 0;
// Translates an interest operation set into a native poll event set
if ((event & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0) ev |= Net.POLLIN;
if ((event & SelectionKey.OP_WRITE) != 0) ev |= Net.POLLOUT;
if ((event & SelectionKey.OP_CONNECT) != 0) ev |= Net.POLLCONN;
// When the socket is closed, the poll event will be triggered
ev |= Net.POLLHUP;
// specify the EPOLLONESHOT flag, to tell epoll to disable the associated
// file descriptor after the receipt of an event with epoll_wait
ev |= IOEA.eventOneShot();
recordTaskByFD(fd, task, ev);
task.setRegisterEventTime();
// we can do it multi-thread, because epoll is protected by spin lock in kernel
// When the EPOLLONESHOT flag is specified, it is the caller's responsibility to
// rearm the file descriptor using epoll_ctl with EPOLL_CTL_MOD
int res = IOEA.eventCtl(epfd, IOEA.eventCtlMod(), fd, ev); // rearm
if (res != 0 && !(res == IOEA.noEvent() && (res = IOEA.eventCtl(epfd, IOEA.eventCtlAdd(), fd, ev)) == 0)) {
removeTaskByFD(fd, ev);
task.resetRegisterEventTime();
throw new IOException("epoll_ctl " + res);
}
}
/**
* API for coroutine do epoll_wait
*
* @param epfd epoll fd
* @param pollArray epoll array address
* @param arraySize epoll array size
* @param timeout timeout ms
* @param status interrupt status;
* @param INTERRUPTED const indicate for interrupted
* @return selected event num
*/
private int epollWaitForWisp(int epfd, long pollArray, int arraySize, long timeout,
AtomicReference<Object> status, final Object INTERRUPTED) throws IOException {
assert pollArray != 0;
WispTask me = WispCarrier.current().current;
if (!WispEngine.runningAsCoroutine(me.getThreadWrapper())) {
return IOEA.eventWait(epfd, pollArray, arraySize, (int) timeout);
}
if (WispConfiguration.MONOLITHIC_POLL) {
if (timeout == 0) {
// return 0 for selectNow(), prevent calling epoll_wait in non-poller thread
// and the application will retry with timeout
return 0;
}
} else {
int updated = IOEA.eventWait(epfd, pollArray, arraySize, 0);
if (timeout == 0 || updated > 0) {
return updated;
}
}
if (WispConfiguration.USE_DIRECT_SELECTOR_WAKEUP &&
(status.get() == INTERRUPTED || !(status.get() == null && status.compareAndSet(null, me)))) {
assert status.get() == INTERRUPTED;
return 0; // already epoll_wait(0), no retry needed.
}
if (WispConfiguration.MONOLITHIC_POLL) {
assert timeout != 0;
me.epollArraySize = arraySize;
me.setEpollEventNum(0);
me.setEpollArray(pollArray);
}
if (timeout != 0) {
registerEvent(me, epfd, SelectionKey.OP_READ);
WispTask.jdkPark(TimeUnit.MILLISECONDS.toNanos(timeout));
}
if (WispConfiguration.USE_DIRECT_SELECTOR_WAKEUP &&
!(status.get() == me && status.compareAndSet(me, null))) {
assert status.get() == INTERRUPTED;
}
if (WispConfiguration.MONOLITHIC_POLL) {
// already polled by poller, see doMonolithicPoll()
me.setEpollArray(0);
return me.getEpollEventNum();
} else {
return IOEA.eventWait(epfd, pollArray, arraySize, 0);
}
}
private void doMonolithicPoll(int fd, WispTask task, long epollArray) throws IOException {
assert WispConfiguration.MONOLITHIC_POLL;
task.setEpollEventNum(IOEA.eventWait(fd, epollArray, task.epollArraySize, 0));
}
private static void interruptEpoll(AtomicReference<Object> status, Object INTERRUPTED, int interruptFd) {
assert WispConfiguration.USE_DIRECT_SELECTOR_WAKEUP;
while (true) {
final Object st = status.get();
if (st == INTERRUPTED || st == null && status.compareAndSet(null, INTERRUPTED)) {
if (!WispConfiguration.ALL_THREAD_AS_WISP) {
try {
IOEA.interrupt(interruptFd);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
break;
} else if (st != null) { // waiting
assert st instanceof WispTask;
if (status.compareAndSet(st, INTERRUPTED)) {
((WispTask) st).jdkUnpark();
break;
}
}
}
}
private volatile int wakeupCount;
boolean pollAndDispatchEvents(long timeout) {
boolean wakened = false;
try {
int n = IOEA.eventWait(epfd, epollArray, MAX_EVENTS_TO_POLL, (int) timeout);
while (n-- > 0) {
long eventAddress = IOEA.getEvent(epollArray, n);
int fd = IOEA.getDescriptor(eventAddress);
if (fd == pipe0) {
wakened = true;
// Conservative strategy, wakeup() can never lost
if (WAKEUP_UPDATER.decrementAndGet(this) == 0) {
IOEA.drain(pipe0);
}
continue;
}
int events = IOEA.getEvents(eventAddress);
if ((events & Net.POLLIN) != 0) {
processEvent(fd, true);
}
if ((events & Net.POLLCONN) != 0 || (events & Net.POLLOUT) != 0) {
processEvent(fd, false);
}
}
} catch (Throwable t) {
t.printStackTrace();
}
return wakened;
}
private void processEvent(int fd, boolean isRead) throws IOException {
WispTask task = removeTaskByFD(fd, isRead ? Net.POLLIN : Net.POLLOUT);
if (task != null) {
long epollArray = task.getEpollArray();
if (isRead && epollArray != 0) {
doMonolithicPoll(fd, task, epollArray);
} else {
task.countWaitSocketIOTime();
}
task.jdkUnpark();
}
}
void wakeup() {
if (WAKEUP_UPDATER.getAndIncrement(this) == 0) {
try {
IOEA.interrupt(pipe1);
} catch (IOException x) {
throw new UncheckedIOException(x);
}
}
}
volatile WispScheduler.Worker owner;
boolean tryAcquire(WispScheduler.Worker worker) {
assert WispConfiguration.CARRIER_AS_POLLER;
return owner == null && OWNER_UPDATER.compareAndSet(this, null, worker);
}
void release(WispScheduler.Worker worker) {
assert owner == worker;
OWNER_UPDATER.lazySet(this, null);
}
private final static AtomicReferenceFieldUpdater<WispEventPump, WispScheduler.Worker> OWNER_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(WispEventPump.class, WispScheduler.Worker.class, "owner");
private final static AtomicIntegerFieldUpdater<WispEventPump> WAKEUP_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(WispEventPump.class, "wakeupCount");
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.*;
enum WispPerfCounterMonitor {
INSTANCE;
private boolean fileHandleEnable = false;
private Map<Long, WispPerfCounter> managedEngineCounters;
private Logger wispLog;
private final SimpleDateFormat localDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
WispPerfCounterMonitor() {
if (WispConfiguration.WISP_PROFILE) {
managedEngineCounters = new ConcurrentHashMap<>(100);
}
if (WispConfiguration.WISP_PROFILE_LOG_ENABLED) {
String logPath = WispConfiguration.WISP_PROFILE_LOG_PATH;
wispLog = Logger.getLogger(WispPerfCounterMonitor.class.getName());
FileHandler fileHandler;
try {
// In a log file, record about 24 hours of data
fileHandler = new FileHandler(
logPath == null ? "wisplog%g.log" : logPath + File.separator + "wisplog%g.log",
12800000, 4, true);
fileHandler.setLevel(Level.INFO);
fileHandler.setFormatter(new java.util.logging.Formatter() {
@Override
public String format(LogRecord record) {
return record.getMessage() + "\n";
}
});
wispLog.setUseParentHandlers(false);
wispLog.addHandler(fileHandler);
fileHandleEnable = true;
} catch (IOException e) {
e.printStackTrace();
}
}
}
void startDaemon() {
Thread thread = new Thread(WispEngine.DAEMON_THREAD_GROUP, this::perfCounterLoop, "Wisp-Monitor");
thread.setDaemon(true);
thread.start();
}
void register(WispCounter counter) {
if (WispConfiguration.WISP_PROFILE && sun.misc.VM.isBooted()) {
WispPerfCounter wispPerfCounter = new WispPerfCounter(counter);
managedEngineCounters.put(counter.carrier.getId(), wispPerfCounter);
}
}
void deRegister(WispCounter counter) {
if (WispConfiguration.WISP_PROFILE) {
managedEngineCounters.remove(counter.carrier.getId());
}
}
WispCounter getWispCounter(long id) {
WispPerfCounter perfCounter = WispEngine.runInCritical(
() -> managedEngineCounters.get(id));
if (perfCounter == null) {
return null;
}
perfCounter.storeCurrentWispCounter();
return perfCounter.prevCounterValue;
}
private void perfCounterLoop() {
while (true) {
try {
Thread.sleep((long) WispConfiguration.WISP_PROFILE_LOG_INTERVAL_MS);
} catch (InterruptedException e) {
// pass
}
dumpCounter();
}
}
private void appendLogString(StringBuilder strb, String dateTime, String item, int workerID, long data) {
strb.append(dateTime)
.append("\t").append(item).append("\t\t")
.append("worker").append(workerID).append("\t\t")
.append(data).append("\n");
}
private void dumpCounter() {
if (!fileHandleEnable) {
return;
}
StringBuilder strb = new StringBuilder();
long currentTime = System.currentTimeMillis();
String dateTime = localDateFormat.format(new Date(currentTime));
int worker = 0;
/* dump Wisp monitor information */
WispPerfCounter perfCounter;
for (Entry<Long, WispPerfCounter> entry : managedEngineCounters.entrySet()) {
perfCounter = entry.getValue();
appendLogString(strb, dateTime, "completedTaskCount", worker, perfCounter.getCompletedTaskCount());
appendLogString(strb, dateTime, "unparkFromJvmCount", worker, perfCounter.getUnparkFromJvmCount());
appendLogString(strb, dateTime, "averageEnqueueTime", worker, perfCounter.getAverageEnqueueTime());
appendLogString(strb, dateTime, "averageExecutionTime", worker, perfCounter.getAverageExecutionTime());
appendLogString(strb, dateTime, "averageWaitSocketIOTime", worker, perfCounter.getAverageWaitSocketIOTime());
appendLogString(strb, dateTime, "averageBlockingTime", worker, perfCounter.getAverageBlockingTime());
perfCounter.storeCurrentWispCounter();
worker++;
}
wispLog.info(strb.toString());
}
private class WispPerfCounter {
WispCounter counter;
WispCounter prevCounterValue;
long getCompletedTaskCount() {
return counter.getCompletedTaskCount() - prevCounterValue.getCompletedTaskCount();
}
long getUnparkFromJvmCount() {
return counter.getUnparkFromJvmCount() - prevCounterValue.getUnparkFromJvmCount();
}
long getAverageTime(Function<WispCounter, Long> timeFunc, Function<WispCounter, Long> countFunc) {
long count = countFunc.apply(counter) - countFunc.apply(prevCounterValue);
if (count == 0) {
return 0;
}
long totalNanos = timeFunc.apply(counter) - timeFunc.apply(prevCounterValue);
return totalNanos / count;
}
long getAverageEnqueueTime() {
return getAverageTime(WispCounter::getTotalEnqueueTime, WispCounter::getEnqueueCount);
}
long getAverageExecutionTime() {
return getAverageTime(WispCounter::getTotalExecutionTime, WispCounter::getExecutionCount);
}
long getAverageWaitSocketIOTime() {
return getAverageTime(WispCounter::getTotalWaitSocketIOTime, WispCounter::getWaitSocketIOCount);
}
long getAverageBlockingTime() {
return getAverageTime(WispCounter::getTotalBlockingTime, WispCounter::getUnparkCount);
}
void storeCurrentWispCounter() {
if (counter == null) {
return;
}
prevCounterValue.assign(counter);
counter.resetMaxValue();
}
WispPerfCounter(WispCounter counter) {
this.counter = counter;
this.prevCounterValue = new WispCounter();
this.prevCounterValue.assign(counter);
}
}
}
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import sun.misc.JavaLangAccess;
import sun.misc.SharedSecrets;
import sun.misc.UnsafeAccess;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
enum WispSysmon {
INSTANCE;
static {
registerNatives();
}
private final Set<WispCarrier> carriers = new ConcurrentSkipListSet<>();
final static String WISP_SYSMON_NAME = "Wisp-Sysmon";
void startDaemon() {
if (WispConfiguration.ENABLE_HANDOFF) {
assert WispConfiguration.HANDOFF_POLICY != null;
Thread thread = new Thread(WispEngine.DAEMON_THREAD_GROUP,
WispSysmon.INSTANCE::sysmonLoop, WISP_SYSMON_NAME);
thread.setDaemon(true);
thread.start();
}
}
void register(WispCarrier carrier) {
if (WispConfiguration.ENABLE_HANDOFF) {
carriers.add(carrier);
}
}
private void sysmonLoop() {
final long interval = TimeUnit.MICROSECONDS.toNanos(WispConfiguration.SYSMON_TICK_US);
final long carrierCheckRate = TimeUnit.MICROSECONDS.toNanos(WispConfiguration.SYSMON_CARRIER_GROW_TICK_US);
final long checkCarrierOnNthTick = carrierCheckRate / interval;
final boolean checkCarrier = WispConfiguration.CARRIER_GROW && checkCarrierOnNthTick > 0
// Detach carrier's worker cnt is not specified by configuration
&& WispConfiguration.WORKER_COUNT == Runtime.getRuntime().availableProcessors();
long nextTick = System.nanoTime() + interval;
int tick = 0;
while (true) {
long timeout = nextTick - System.nanoTime();
if (timeout > 0) {
do {
UA.park0(false, timeout);
} while ((timeout = nextTick - System.nanoTime()) > 0);
handleLongOccupation();
if (checkCarrier && tick++ == checkCarrierOnNthTick) {
WispEngine.WISP_ROOT_ENGINE.scheduler.checkAndGrowWorkers(Runtime.getRuntime().availableProcessors());
tick = 0;
}
} // else: we're too slow, skip a tick
nextTick += interval;
}
}
private final List<WispCarrier> longOccupationEngines = new ArrayList<>();
/**
* Handle a WispTask occupied a worker thread for long time.
*/
private void handleLongOccupation() {
for (WispCarrier carrier : carriers) {
if (carrier.terminated) {
// remove in iteration is OK for ConcurrentSkipListSet
carriers.remove(carrier);
continue;
}
if (carrier.isRunning() && carrier.schedTick == carrier.lastSchedTick) {
longOccupationEngines.add(carrier);
}
carrier.lastSchedTick = carrier.schedTick;
}
if (!longOccupationEngines.isEmpty()) {
Iterator<WispCarrier> itr = longOccupationEngines.iterator();
while (itr.hasNext()) {
WispCarrier carrier = itr.next();
WispConfiguration.HANDOFF_POLICY.handle(carrier, !itr.hasNext());
itr.remove();
}
}
assert longOccupationEngines.isEmpty();
}
enum Policy {
HAND_OFF { // handOff the worker
@Override
void handle(WispCarrier carrier, boolean isLast) {
if (JLA.isInSameNative(carrier.thread)) {
carrier.handOff();
INSTANCE.carriers.remove(carrier);
}
}
},
PREEMPT { // insert a yield() after next safepoint
@Override
void handle(WispCarrier carrier, boolean isLast) {
markPreempted(carrier.thread, isLast);
}
},
ADAPTIVE { // depends on thread status
@Override
void handle(WispCarrier carrier, boolean isLast) {
if (JLA.isInSameNative(carrier.thread)) {
carrier.handOff();
INSTANCE.carriers.remove(carrier);
} else {
markPreempted(carrier.thread, isLast);
}
}
};
abstract void handle(WispCarrier carrier, boolean isLast);
}
private static native void registerNatives();
/**
* Mark the thread as running single wispTask in java too much time.
* And the Thread.yield() invocation will be emitted after next safepoint.
*
* @param thread the thread to mark
* @param force fire a force_safepoint immediately
*/
private static native void markPreempted(Thread thread, boolean force);
private static final UnsafeAccess UA = SharedSecrets.getUnsafeAccess();
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
}
此差异已折叠。
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import sun.misc.JavaLangAccess;
import sun.misc.SharedSecrets;
import sun.reflect.CallerSensitive;
import java.dyn.CoroutineSupport;
/**
* An wrapper of {@link Thread} let every {@link WispTask} get different thread
* object from {@link Thread#currentThread()}.
* In this way, we make the listed class (not only but include) behave expected without
* changing their code.
* <p>
* 1. {@link ThreadLocal}
* 2. {@link java.util.concurrent.locks.AbstractQueuedSynchronizer} based synchronizer
* 3. Netty's judgment of weather we are running in it's worker thread.
*/
class WispThreadWrapper extends Thread {
WispThreadWrapper(WispTask task) {
JLA.setWispTask(this, task);
setName(task.getName());
}
@Override
public CoroutineSupport getCoroutineSupport() {
return JLA.getWispTask(this).carrier.thread.getCoroutineSupport();
}
@Override
public void start() {
throw new UnsupportedOperationException();
}
@Override
@Deprecated
public void destroy() {
}
@Override
@Deprecated
public int countStackFrames() {
return JLA.getWispTask(this).carrier.thread.countStackFrames();
}
@Override
@CallerSensitive
public ClassLoader getContextClassLoader() {
return JLA.getWispTask(this).ctxClassLoader;
}
@Override
public void setContextClassLoader(ClassLoader cl) {
JLA.getWispTask(this).ctxClassLoader = cl;
}
@Override
public StackTraceElement[] getStackTrace() {
return super.getStackTrace();
}
@Override
public long getId() {
return super.getId();
}
@Override
public State getState() {
return JLA.getWispTask(this).carrier.thread.getState();
}
@Override
public UncaughtExceptionHandler getUncaughtExceptionHandler() {
return JLA.getWispTask(this).carrier.thread.getUncaughtExceptionHandler();
}
@Override
public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
JLA.getWispTask(this).carrier.thread.setUncaughtExceptionHandler(eh);
}
@Override
public String toString() {
return "WispThreadWrapper{" +
"wispTask=" + JLA.getWispTask(this) +
'}';
}
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
}
package sun.nio.ch;
public class IOEvent {
public static Class<?> eventClass() {
return EPollPort.class;
}
}
\ No newline at end of file
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package sun.nio.ch;
import sun.misc.SharedSecrets;
import sun.misc.WispEngineAccess;
import java.io.IOException;
import java.net.*;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
// Make a server socket channel be like a socket and yield on block
public class WispServerSocketImpl
{
private static WispEngineAccess WEA = SharedSecrets.getWispEngineAccess();
private WispSocketLockSupport wispSocketLockSupport = new WispSocketLockSupport();
// The channel being adapted
private ServerSocketChannelImpl ssc = null;
// Timeout "option" value for accepts
private volatile int timeout = 0;
public WispServerSocketImpl() {
}
public void bind(SocketAddress local) throws IOException {
bind(local, 50);
}
public void bind(SocketAddress local, int backlog) throws IOException {
if (local == null)
local = new InetSocketAddress(0);
try {
getChannelImpl().bind(local, backlog);
} catch (Exception x) {
Net.translateException(x);
}
}
public InetAddress getInetAddress() {
if (ssc == null || !ssc.isBound())
return null;
return Net.getRevealedLocalAddress(ssc.localAddress()).getAddress();
}
public int getLocalPort() {
if (ssc == null || !ssc.isBound())
return -1;
return Net.asInetSocketAddress(ssc.localAddress()).getPort();
}
public Socket accept() throws IOException {
try {
wispSocketLockSupport.beginRead();
return accept0();
} finally {
wispSocketLockSupport.endRead();
}
}
private Socket accept0() throws IOException {
final ServerSocketChannel ch = getChannelImpl();
try {
SocketChannel res;
if (getSoTimeout() > 0) {
WEA.addTimer(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(getSoTimeout()));
}
while ((res = ch.accept()) == null) {
WEA.registerEvent(ch, SelectionKey.OP_ACCEPT);
WEA.park(-1);
if (getSoTimeout() > 0 && WEA.isTimeout()) {
throw new SocketTimeoutException("time out");
}
}
res.configureBlocking(false);
return new Socket(res);
} catch (Exception x) {
Net.translateException(x, true);
return null;
} finally {
if (getSoTimeout() > 0) {
WEA.cancelTimer();
}
WEA.unregisterEvent();
}
}
public void close() throws IOException {
if (ssc != null) {
ssc.close();
wispSocketLockSupport.unparkBlockedWispTask();
}
}
public ServerSocketChannel getChannel() {
return ssc;
}
public boolean isBound() {
return ssc != null && ssc.isBound();
}
public boolean isClosed() {
return ssc != null && !ssc.isOpen();
}
public void setSoTimeout(int timeout) throws SocketException {
this.timeout = timeout;
}
public int getSoTimeout() throws IOException {
return timeout;
}
public void setReuseAddress(boolean on) throws SocketException {
try {
getChannelImpl().setOption(StandardSocketOptions.SO_REUSEADDR, on);
} catch (IOException x) {
Net.translateToSocketException(x);
}
}
public boolean getReuseAddress() throws SocketException {
try {
return getChannelImpl().getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException x) {
Net.translateToSocketException(x);
return false; // Never happens
}
}
public String toString() {
if (!isBound())
return "ServerSocket[unbound]";
return "ServerSocket[addr=" + getInetAddress() +
",localport=" + getLocalPort() + "]";
}
public void setReceiveBufferSize(int size) throws SocketException {
if (size <= 0)
throw new IllegalArgumentException("size can not be 0 or negative");
try {
getChannelImpl().setOption(StandardSocketOptions.SO_RCVBUF, size);
} catch (IOException x) {
Net.translateToSocketException(x);
}
}
public int getReceiveBufferSize() throws SocketException {
try {
return getChannelImpl().getOption(StandardSocketOptions.SO_RCVBUF);
} catch (IOException x) {
Net.translateToSocketException(x);
return -1; // Never happens
}
}
private ServerSocketChannelImpl getChannelImpl() throws SocketException {
if (ssc == null) {
try {
ssc = (ServerSocketChannelImpl) ServerSocketChannel.open();
ssc.configureBlocking(false);
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
}
return ssc;
}
}
此差异已折叠。
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package sun.nio.ch;
import com.alibaba.wisp.engine.WispEngine;
import com.alibaba.wisp.engine.WispTask;
import sun.misc.SharedSecrets;
import sun.misc.WispEngineAccess;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class supports fd use across {@link WispTask} by adding read/write reentrantLocks for
* {@link WispSocketImpl} {@link WispServerSocketImpl} and {@link WispUdpSocketImpl},
* {@link WispTask}s will park once they encountered a contended socket.
*/
class WispSocketLockSupport {
private static WispEngineAccess WEA = SharedSecrets.getWispEngineAccess();
/** Lock held when reading and accepting
*/
private final ReentrantLock readLock = WEA.enableSocketLock()? new ReentrantLock() : null;
/** Lock held when writing and connecting
*/
private final ReentrantLock writeLock = WEA.enableSocketLock()? new ReentrantLock() : null;
/** Lock held when tracking current blocked WispTask and closing
*/
private final ReentrantLock stateLock = WEA.enableSocketLock()? new ReentrantLock() : null;
WispTask blockedReadWispTask = null;
WispTask blockedWriteWispTask = null;
/**
* This is not a ReadWriteLock, they are separate locks here, readLock protect
* reading to fd while writeLock protect writing to fd.
*/
private void lockRead() {
readLock.lock();
}
private void lockWrite() {
writeLock.lock();
}
private void unLockRead() {
readLock.unlock();
}
private void unLockWrite() {
writeLock.unlock();
}
void beginRead() {
if (!WEA.enableSocketLock()) {
return;
}
lockRead();
stateLock.lock();
try {
blockedReadWispTask = WEA.getCurrentTask();
} finally {
stateLock.unlock();
}
}
void endRead() {
if (!WEA.enableSocketLock()) {
return;
}
stateLock.lock();
try {
blockedReadWispTask = null;
} finally {
stateLock.unlock();
}
unLockRead();
}
void beginWrite() {
if (!WEA.enableSocketLock()) {
return;
}
lockWrite();
stateLock.lock();
try {
blockedWriteWispTask = WEA.getCurrentTask();
} finally {
stateLock.unlock();
}
}
void endWrite() {
if (!WEA.enableSocketLock()) {
return;
}
stateLock.lock();
try {
blockedWriteWispTask = null;
} finally {
stateLock.unlock();
}
unLockWrite();
}
void unparkBlockedWispTask() {
stateLock.lock();
try {
if (blockedReadWispTask != null) {
WEA.unpark(blockedReadWispTask);
blockedReadWispTask = null;
}
if (blockedWriteWispTask != null) {
WEA.unpark(blockedWriteWispTask);
blockedWriteWispTask = null;
}
} finally {
stateLock.unlock();
}
}
}
此差异已折叠。
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "jni.h"
#include "jvm.h"
#include "com_alibaba_wisp_engine_WispEngine.h"
#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))
static JNINativeMethod methods[] = {
{"getProxyUnpark", "([I)I", (void *)&JVM_GetProxyUnpark},
};
JNIEXPORT void JNICALL
Java_com_alibaba_wisp_engine_WispEngine_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
public class WispTask {
public Thread getThreadWrapper() {
throw new UnsupportedOperationException();
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册