提交 920331f9 编写于 作者: R rbackman

7127792: Add the ability to change an existing PeriodicTask's execution interval

Summary: Enables dynamic enrollment / disenrollment from the PeriodicTasks in WatcherThread.
Reviewed-by: dholmes, mgronlun
上级 4469db58
......@@ -140,6 +140,7 @@ Monitor* JfrQuery_lock = NULL;
Monitor* JfrMsg_lock = NULL;
Mutex* JfrBuffer_lock = NULL;
Mutex* JfrStream_lock = NULL;
Monitor* PeriodicTask_lock = NULL;
#define MAX_NUM_MUTEX 128
static Monitor * _mutex_array[MAX_NUM_MUTEX];
......@@ -285,6 +286,7 @@ void mutex_init() {
def(JfrMsg_lock , Monitor, nonleaf+2, true);
def(JfrBuffer_lock , Mutex, nonleaf+3, true);
def(JfrStream_lock , Mutex, nonleaf+4, true);
def(PeriodicTask_lock , Monitor, nonleaf+5, true);
}
GCMutexLocker::GCMutexLocker(Monitor * mutex) {
......
......@@ -142,6 +142,7 @@ extern Monitor* JfrQuery_lock; // protects JFR use
extern Monitor* JfrMsg_lock; // protects JFR messaging
extern Mutex* JfrBuffer_lock; // protects JFR buffer operations
extern Mutex* JfrStream_lock; // protects JFR stream access
extern Monitor* PeriodicTask_lock; // protects the periodic task structure
// A MutexLocker provides mutual exclusion with respect to a given mutex
// for the scope which contains the locker. The lock is an OS lock, not
......
......@@ -61,7 +61,7 @@ void PeriodicTask::print_intervals() {
}
#endif
void PeriodicTask::real_time_tick(size_t delay_time) {
void PeriodicTask::real_time_tick(int delay_time) {
#ifndef PRODUCT
if (ProfilerCheckIntervals) {
_ticks++;
......@@ -73,19 +73,39 @@ void PeriodicTask::real_time_tick(size_t delay_time) {
_intervalHistogram[ms]++;
}
#endif
int orig_num_tasks = _num_tasks;
for(int index = 0; index < _num_tasks; index++) {
_tasks[index]->execute_if_pending(delay_time);
if (_num_tasks < orig_num_tasks) { // task dis-enrolled itself
index--; // re-do current slot as it has changed
orig_num_tasks = _num_tasks;
{
MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
int orig_num_tasks = _num_tasks;
for(int index = 0; index < _num_tasks; index++) {
_tasks[index]->execute_if_pending(delay_time);
if (_num_tasks < orig_num_tasks) { // task dis-enrolled itself
index--; // re-do current slot as it has changed
orig_num_tasks = _num_tasks;
}
}
}
}
int PeriodicTask::time_to_wait() {
MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
if (_num_tasks == 0) {
return 0; // sleep until shutdown or a task is enrolled
}
int delay = _tasks[0]->time_to_next_interval();
for (int index = 1; index < _num_tasks; index++) {
delay = MIN2(delay, _tasks[index]->time_to_next_interval());
}
return delay;
}
PeriodicTask::PeriodicTask(size_t interval_time) :
_counter(0), _interval(interval_time) {
_counter(0), _interval((int) interval_time) {
// Sanity check the interval time
assert(_interval >= PeriodicTask::min_interval &&
_interval <= PeriodicTask::max_interval &&
......@@ -94,33 +114,40 @@ PeriodicTask::PeriodicTask(size_t interval_time) :
}
PeriodicTask::~PeriodicTask() {
if (is_enrolled())
disenroll();
}
bool PeriodicTask::is_enrolled() const {
for(int index = 0; index < _num_tasks; index++)
if (_tasks[index] == this) return true;
return false;
disenroll();
}
void PeriodicTask::enroll() {
assert(WatcherThread::watcher_thread() == NULL, "dynamic enrollment of tasks not yet supported");
MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
if (_num_tasks == PeriodicTask::max_tasks)
if (_num_tasks == PeriodicTask::max_tasks) {
fatal("Overflow in PeriodicTask table");
}
_tasks[_num_tasks++] = this;
WatcherThread* thread = WatcherThread::watcher_thread();
if (thread) {
thread->unpark();
} else {
WatcherThread::start();
}
}
void PeriodicTask::disenroll() {
assert(WatcherThread::watcher_thread() == NULL ||
Thread::current() == WatcherThread::watcher_thread(),
"dynamic disenrollment currently only handled from WatcherThread from within task() method");
MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
int index;
for(index = 0; index < _num_tasks && _tasks[index] != this; index++);
if (index == _num_tasks) return;
for(index = 0; index < _num_tasks && _tasks[index] != this; index++)
;
if (index == _num_tasks) {
return;
}
_num_tasks--;
for (; index < _num_tasks; index++) {
_tasks[index] = _tasks[index+1];
}
......
......@@ -49,12 +49,12 @@ class PeriodicTask: public CHeapObj<mtInternal> {
static int num_tasks() { return _num_tasks; }
private:
size_t _counter;
const size_t _interval;
int _counter;
const int _interval;
static int _num_tasks;
static PeriodicTask* _tasks[PeriodicTask::max_tasks];
static void real_time_tick(size_t delay_time);
static void real_time_tick(int delay_time);
#ifndef PRODUCT
static elapsedTimer _timer; // measures time between ticks
......@@ -69,51 +69,36 @@ class PeriodicTask: public CHeapObj<mtInternal> {
PeriodicTask(size_t interval_time); // interval is in milliseconds of elapsed time
~PeriodicTask();
// Tells whether is enrolled
bool is_enrolled() const;
// Make the task active
// NOTE: this may only be called before the WatcherThread has been started
// For dynamic enrollment at the time T, the task will execute somewhere
// between T and T + interval_time.
void enroll();
// Make the task deactive
// NOTE: this may only be called either while the WatcherThread is
// inactive or by a task from within its task() method. One-shot or
// several-shot tasks may be implemented this way.
void disenroll();
void execute_if_pending(size_t delay_time) {
_counter += delay_time;
if (_counter >= _interval) {
void execute_if_pending(int delay_time) {
// make sure we don't overflow
jlong tmp = (jlong) _counter + (jlong) delay_time;
if (tmp >= (jlong) _interval) {
_counter = 0;
task();
} else {
_counter += delay_time;
}
}
// Returns how long (time in milliseconds) before the next time we should
// execute this task.
size_t time_to_next_interval() const {
int time_to_next_interval() const {
assert(_interval > _counter, "task counter greater than interval?");
return _interval - _counter;
}
// Calculate when the next periodic task will fire.
// Called by the WatcherThread's run method.
// This assumes that periodic tasks aren't entering the system
// dynamically, except for during startup.
static size_t time_to_wait() {
if (_num_tasks == 0) {
// Don't wait any more; shut down the thread since we don't
// currently support dynamic enrollment.
return 0;
}
size_t delay = _tasks[0]->time_to_next_interval();
for (int index = 1; index < _num_tasks; index++) {
delay = MIN2(delay, _tasks[index]->time_to_next_interval());
}
return delay;
}
static int time_to_wait();
// The task to perform at each period
virtual void task() = 0;
......
......@@ -1217,6 +1217,7 @@ void NamedThread::set_name(const char* format, ...) {
// timer interrupts exists on the platform.
WatcherThread* WatcherThread::_watcher_thread = NULL;
bool WatcherThread::_startable = false;
volatile bool WatcherThread::_should_terminate = false;
WatcherThread::WatcherThread() : Thread() {
......@@ -1237,6 +1238,55 @@ WatcherThread::WatcherThread() : Thread() {
}
}
int WatcherThread::sleep() const {
MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
// remaining will be zero if there are no tasks,
// causing the WatcherThread to sleep until a task is
// enrolled
int remaining = PeriodicTask::time_to_wait();
int time_slept = 0;
// we expect this to timeout - we only ever get unparked when
// we should terminate or when a new task has been enrolled
OSThreadWaitState osts(this->osthread(), false /* not Object.wait() */);
jlong time_before_loop = os::javaTimeNanos();
for (;;) {
bool timedout = PeriodicTask_lock->wait(Mutex::_no_safepoint_check_flag, remaining);
jlong now = os::javaTimeNanos();
if (remaining == 0) {
// if we didn't have any tasks we could have waited for a long time
// consider the time_slept zero and reset time_before_loop
time_slept = 0;
time_before_loop = now;
} else {
// need to recalulate since we might have new tasks in _tasks
time_slept = (int) ((now - time_before_loop) / 1000000);
}
// Change to task list or spurious wakeup of some kind
if (timedout || _should_terminate) {
break;
}
remaining = PeriodicTask::time_to_wait();
if (remaining == 0) {
// Last task was just disenrolled so loop around and wait until
// another task gets enrolled
continue;
}
remaining -= time_slept;
if (remaining <= 0)
break;
}
return time_slept;
}
void WatcherThread::run() {
assert(this == watcher_thread(), "just checking");
......@@ -1249,26 +1299,7 @@ void WatcherThread::run() {
// Calculate how long it'll be until the next PeriodicTask work
// should be done, and sleep that amount of time.
size_t time_to_wait = PeriodicTask::time_to_wait();
// we expect this to timeout - we only ever get unparked when
// we should terminate
{
OSThreadWaitState osts(this->osthread(), false /* not Object.wait() */);
jlong prev_time = os::javaTimeNanos();
for (;;) {
int res= _SleepEvent->park(time_to_wait);
if (res == OS_TIMEOUT || _should_terminate)
break;
// spurious wakeup of some kind
jlong now = os::javaTimeNanos();
time_to_wait -= (now - prev_time) / 1000000;
if (time_to_wait <= 0)
break;
prev_time = now;
}
}
int time_waited = sleep();
if (is_error_reported()) {
// A fatal error has happened, the error handler(VMError::report_and_die)
......@@ -1298,13 +1329,7 @@ void WatcherThread::run() {
}
}
PeriodicTask::real_time_tick(time_to_wait);
// If we have no more tasks left due to dynamic disenrollment,
// shut down the thread since we don't currently support dynamic enrollment
if (PeriodicTask::num_tasks() == 0) {
_should_terminate = true;
}
PeriodicTask::real_time_tick(time_waited);
}
// Signal that it is terminated
......@@ -1319,22 +1344,33 @@ void WatcherThread::run() {
}
void WatcherThread::start() {
if (watcher_thread() == NULL) {
assert(PeriodicTask_lock->owned_by_self(), "PeriodicTask_lock required");
if (watcher_thread() == NULL && _startable) {
_should_terminate = false;
// Create the single instance of WatcherThread
new WatcherThread();
}
}
void WatcherThread::make_startable() {
assert(PeriodicTask_lock->owned_by_self(), "PeriodicTask_lock required");
_startable = true;
}
void WatcherThread::stop() {
{
MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
_should_terminate = true;
OrderAccess::fence(); // ensure WatcherThread sees update in main loop
WatcherThread* watcher = watcher_thread();
if (watcher != NULL)
watcher->unpark();
}
// it is ok to take late safepoints here, if needed
MutexLocker mu(Terminator_lock);
_should_terminate = true;
OrderAccess::fence(); // ensure WatcherThread sees update in main loop
Thread* watcher = watcher_thread();
if (watcher != NULL)
watcher->_SleepEvent->unpark();
while(watcher_thread() != NULL) {
// This wait should make safepoint checks, wait without a timeout,
......@@ -1352,6 +1388,11 @@ void WatcherThread::stop() {
}
}
void WatcherThread::unpark() {
MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ? NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
PeriodicTask_lock->notify();
}
void WatcherThread::print_on(outputStream* st) const {
st->print("\"%s\" ", name());
Thread::print_on(st);
......@@ -3658,12 +3699,18 @@ jint Threads::create_vm(JavaVMInitArgs* args, bool* canTryAgain) {
}
}
// Start up the WatcherThread if there are any periodic tasks
// NOTE: All PeriodicTasks should be registered by now. If they
// aren't, late joiners might appear to start slowly (we might
// take a while to process their first tick).
if (PeriodicTask::num_tasks() > 0) {
WatcherThread::start();
{
MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
// Make sure the watcher thread can be started by WatcherThread::start()
// or by dynamic enrollment.
WatcherThread::make_startable();
// Start up the WatcherThread if there are any periodic tasks
// NOTE: All PeriodicTasks should be registered by now. If they
// aren't, late joiners might appear to start slowly (we might
// take a while to process their first tick).
if (PeriodicTask::num_tasks() > 0) {
WatcherThread::start();
}
}
// Give os specific code one last chance to start
......
......@@ -722,6 +722,7 @@ class WatcherThread: public Thread {
private:
static WatcherThread* _watcher_thread;
static bool _startable;
volatile static bool _should_terminate; // updated without holding lock
public:
enum SomeConstants {
......@@ -738,6 +739,7 @@ class WatcherThread: public Thread {
char* name() const { return (char*)"VM Periodic Task Thread"; }
void print_on(outputStream* st) const;
void print() const { print_on(tty); }
void unpark();
// Returns the single instance of WatcherThread
static WatcherThread* watcher_thread() { return _watcher_thread; }
......@@ -745,6 +747,12 @@ class WatcherThread: public Thread {
// Create and start the single instance of WatcherThread, or stop it on shutdown
static void start();
static void stop();
// Only allow start once the VM is sufficiently initialized
// Otherwise the first task to enroll will trigger the start
static void make_startable();
private:
int sleep() const;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册