/*
* Copyright (c) 2020 Alibaba Group Holding Limited. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Alibaba designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*/
package com.alibaba.wisp.engine;
import sun.misc.JavaLangAccess;
import sun.misc.SharedSecrets;
import sun.misc.WispEngineAccess;
import java.dyn.Coroutine;
import java.dyn.CoroutineExitException;
import java.dyn.CoroutineSupport;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
/**
* Coroutine Runtime Engine. It's a "wisp" thing, as we want our asynchronization transformation to be transparent
* without any modification to user code.
*
* WispEngine represents a group of {@link WispCarrier}, which can steal
* tasks from each other to achieve work-stealing.
*
* {@code WispEngine#WISP_ROOT_ENGINE} is created by system.
* {@link WispEngine#current().execute(Runnable)} in non-worker thread and WISP_ROOT_ENGINE's
* worker thread will dispatch task in this carrier.
*
* User code could also create {@link WispEngine} by calling
* {@link WispEngine#createEngine(int, ThreadFactory)},
* Calling {@link WispEngine#execute(Runnable)} will dispatch
* WispTask inner created carrier.
* {@link WispEngine#current().execute(Runnable)} in a user created carrier will also
* dispatch task in current carrier.
*/
public class WispEngine extends AbstractExecutorService {
static {
registerNatives();
setWispEngineAccess();
timer = createTimerScheduler();
}
public static boolean transparentWispSwitch() {
return WispConfiguration.TRANSPARENT_WISP_SWITCH;
}
public static boolean enableThreadAsWisp() {
return shiftThreadModel;
}
@Deprecated
public static boolean isTransparentAsync() {
return transparentWispSwitch();
}
private static final String WISP_ROOT_ENGINE_NAME = "Root";
private static final AtomicReferenceFieldUpdater SHUTDOWN_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(WispEngine.class, Boolean.class, "hasBeenShutdown");
/*
some of our users change this field by reflection
in the runtime to disable wisp temporarily.
We should move shiftThreadModel to WispConfiguration
after we provide api to control this behavior and
notify the users to modify their code.
TODO refactor to com.alibaba.wisp.enableThreadAsWisp later
*/
static boolean shiftThreadModel = WispConfiguration.ENABLE_THREAD_AS_WISP;
static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
/*
* Wisp specified Thread Group
* all the daemon threads in wisp should be created with the Thread Group.
* In Thread.start(), if the thread should not convert to WispTask,
* check whether the thread's carrier is daemonThreadGroup
*/
final static ThreadGroup DAEMON_THREAD_GROUP =
new ThreadGroup(JLA.currentThread0().getThreadGroup(), "Daemon Thread Group");
static ScheduledExecutorService timer;
static Set carrierThreads;
static Thread unparkDispatcher;
static WispEngine WISP_ROOT_ENGINE;
private static ScheduledExecutorService createTimerScheduler() {
return !WispConfiguration.WISP_HIGH_PRECISION_TIMER ? null :
Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(WispEngine.DAEMON_THREAD_GROUP, r);
thread.setDaemon(true);
thread.setName("Wisp-Timer");
return thread;
}
});
}
private static void initializeWispClass() {
assert JLA != null : "WispCarrier should be initialized after System";
assert JLA.currentThread0().getName().equals("main") : "Wisp need to be loaded by main thread";
shiftThreadModel = WispConfiguration.ENABLE_THREAD_AS_WISP;
carrierThreads = new ConcurrentSkipListSet<>(new Comparator() {
@Override
public int compare(Thread o1, Thread o2) {
return Long.compare(o1.getId(), o2.getId());
}
});
WISP_ROOT_ENGINE = new WispEngine(WISP_ROOT_ENGINE_NAME);
if (transparentWispSwitch()) {
WispEngine.initializeClasses();
JLA.wispBooted();
}
}
private static void initializeClasses() {
try {
Class.forName(CoroutineExitException.class.getName());
Class.forName(WispThreadWrapper.class.getName());
Class.forName(TaskDispatcher.class.getName());
Class.forName(StartShutdown.class.getName());
Class.forName(NotifyAndWaitTasksForShutdown.class.getName());
Class.forName(Coroutine.StealResult.class.getName());
Class.forName(WispCounterMXBeanImpl.class.getName());
Class.forName(ThreadAsWisp.class.getName());
Class.forName(WispEventPump.class.getName());
if (WispConfiguration.WISP_PROFILE) {
Class.forName(WispPerfCounterMonitor.class.getName());
}
if (WispConfiguration.WISP_HIGH_PRECISION_TIMER) {
timer.submit(new Runnable() {
@Override
public void run() {
}
});
}
new ConcurrentLinkedQueue<>().iterator();
new ConcurrentSkipListMap<>().keySet().iterator();
WispCarrier carrier = WispCarrier.current();
carrier.addTimer(System.nanoTime() + Integer.MAX_VALUE, false);
carrier.cancelTimer();
carrier.createResumeEntry(new WispTask(carrier, null, false, false));
registerPerfCounter(carrier);
deRegisterPerfCounter(carrier);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
private static void startWispDaemons() {
if (transparentWispSwitch()) {
unparkDispatcher = new Thread(DAEMON_THREAD_GROUP, new Runnable() {
@Override
public void run() {
int[] proxyUnparks = new int[12];
CoroutineSupport.setWispBooted();
while (true) {
int n = WispEngine.getProxyUnpark(proxyUnparks);
for (int i = 0; i < n; i++) {
WispTask.unparkById(proxyUnparks[i]);
}
}
}
}, "Wisp-Unpark-Dispatcher");
unparkDispatcher.setDaemon(true);
unparkDispatcher.start();
WispSysmon.INSTANCE.startDaemon();
WISP_ROOT_ENGINE.scheduler.startWorkerThreads();
if (!WispConfiguration.CARRIER_AS_POLLER) {
WispEventPump.Pool.INSTANCE.startPollerThreads();
}
if (WispConfiguration.WISP_PROFILE_LOG_ENABLED) {
WispPerfCounterMonitor.INSTANCE.startDaemon();
}
}
}
private static void setWispEngineAccess() {
SharedSecrets.setWispEngineAccess(new WispEngineAccess() {
@Override
public WispTask getCurrentTask() {
return WispCarrier.current().getCurrentTask();
}
@Override
public void registerEvent(SelectableChannel ch, int events) throws IOException {
WispCarrier.current().registerEvent(ch, events);
}
@Override
public void unregisterEvent() {
WispCarrier.current().unregisterEvent();
}
@Override
public int epollWait(int epfd, long pollArray, int arraySize, long timeout,
AtomicReference