提交 7f92f7a2 编写于 作者: I iveresov

6862387: tune concurrent refinement further

Summary: Reworked the concurrent refinement: threads activation, feedback-based threshold adjustment, other miscellaneous fixes.
Reviewed-by: apetrusenko, tonyp
上级 186d1e6d
...@@ -42,28 +42,49 @@ ConcurrentG1Refine::ConcurrentG1Refine() : ...@@ -42,28 +42,49 @@ ConcurrentG1Refine::ConcurrentG1Refine() :
_n_periods(0), _n_periods(0),
_threads(NULL), _n_threads(0) _threads(NULL), _n_threads(0)
{ {
if (G1ConcRefine) {
_n_threads = (int)thread_num(); // Ergomonically select initial concurrent refinement parameters
if (_n_threads > 0) { if (FLAG_IS_DEFAULT(G1ConcRefineGreenZone)) {
_threads = NEW_C_HEAP_ARRAY(ConcurrentG1RefineThread*, _n_threads); FLAG_SET_DEFAULT(G1ConcRefineGreenZone, MAX2<int>(ParallelGCThreads, 1));
int worker_id_offset = (int)DirtyCardQueueSet::num_par_ids(); }
ConcurrentG1RefineThread *next = NULL; set_green_zone(G1ConcRefineGreenZone);
for (int i = _n_threads - 1; i >= 0; i--) {
ConcurrentG1RefineThread* t = new ConcurrentG1RefineThread(this, next, worker_id_offset, i); if (FLAG_IS_DEFAULT(G1ConcRefineYellowZone)) {
assert(t != NULL, "Conc refine should have been created"); FLAG_SET_DEFAULT(G1ConcRefineYellowZone, green_zone() * 3);
assert(t->cg1r() == this, "Conc refine thread should refer to this"); }
_threads[i] = t; set_yellow_zone(MAX2<int>(G1ConcRefineYellowZone, green_zone()));
next = t;
} if (FLAG_IS_DEFAULT(G1ConcRefineRedZone)) {
} FLAG_SET_DEFAULT(G1ConcRefineRedZone, yellow_zone() * 2);
}
set_red_zone(MAX2<int>(G1ConcRefineRedZone, yellow_zone()));
_n_worker_threads = thread_num();
// We need one extra thread to do the young gen rset size sampling.
_n_threads = _n_worker_threads + 1;
reset_threshold_step();
_threads = NEW_C_HEAP_ARRAY(ConcurrentG1RefineThread*, _n_threads);
int worker_id_offset = (int)DirtyCardQueueSet::num_par_ids();
ConcurrentG1RefineThread *next = NULL;
for (int i = _n_threads - 1; i >= 0; i--) {
ConcurrentG1RefineThread* t = new ConcurrentG1RefineThread(this, next, worker_id_offset, i);
assert(t != NULL, "Conc refine should have been created");
assert(t->cg1r() == this, "Conc refine thread should refer to this");
_threads[i] = t;
next = t;
} }
} }
size_t ConcurrentG1Refine::thread_num() { void ConcurrentG1Refine::reset_threshold_step() {
if (G1ConcRefine) { if (FLAG_IS_DEFAULT(G1ConcRefineThresholdStep)) {
return (G1ParallelRSetThreads > 0) ? G1ParallelRSetThreads : ParallelGCThreads; _thread_threshold_step = (yellow_zone() - green_zone()) / (worker_thread_num() + 1);
} else {
_thread_threshold_step = G1ConcRefineThresholdStep;
} }
return 0; }
int ConcurrentG1Refine::thread_num() {
return MAX2<int>((G1ParallelRSetThreads > 0) ? G1ParallelRSetThreads : ParallelGCThreads, 1);
} }
void ConcurrentG1Refine::init() { void ConcurrentG1Refine::init() {
...@@ -123,6 +144,15 @@ void ConcurrentG1Refine::stop() { ...@@ -123,6 +144,15 @@ void ConcurrentG1Refine::stop() {
} }
} }
void ConcurrentG1Refine::reinitialize_threads() {
reset_threshold_step();
if (_threads != NULL) {
for (int i = 0; i < _n_threads; i++) {
_threads[i]->initialize();
}
}
}
ConcurrentG1Refine::~ConcurrentG1Refine() { ConcurrentG1Refine::~ConcurrentG1Refine() {
if (G1ConcRSLogCacheSize > 0) { if (G1ConcRSLogCacheSize > 0) {
assert(_card_counts != NULL, "Logic"); assert(_card_counts != NULL, "Logic");
...@@ -384,4 +414,3 @@ void ConcurrentG1Refine::print_worker_threads_on(outputStream* st) const { ...@@ -384,4 +414,3 @@ void ConcurrentG1Refine::print_worker_threads_on(outputStream* st) const {
st->cr(); st->cr();
} }
} }
...@@ -29,6 +29,31 @@ class G1RemSet; ...@@ -29,6 +29,31 @@ class G1RemSet;
class ConcurrentG1Refine: public CHeapObj { class ConcurrentG1Refine: public CHeapObj {
ConcurrentG1RefineThread** _threads; ConcurrentG1RefineThread** _threads;
int _n_threads; int _n_threads;
int _n_worker_threads;
/*
* The value of the update buffer queue length falls into one of 3 zones:
* green, yellow, red. If the value is in [0, green) nothing is
* done, the buffers are left unprocessed to enable the caching effect of the
* dirtied cards. In the yellow zone [green, yellow) the concurrent refinement
* threads are gradually activated. In [yellow, red) all threads are
* running. If the length becomes red (max queue length) the mutators start
* processing the buffers.
*
* There are some interesting cases (with G1AdaptiveConcRefine turned off):
* 1) green = yellow = red = 0. In this case the mutator will process all
* buffers. Except for those that are created by the deferred updates
* machinery during a collection.
* 2) green = 0. Means no caching. Can be a good way to minimize the
* amount of time spent updating rsets during a collection.
*/
int _green_zone;
int _yellow_zone;
int _red_zone;
int _thread_threshold_step;
// Reset the threshold step value based of the current zone boundaries.
void reset_threshold_step();
// The cache for card refinement. // The cache for card refinement.
bool _use_cache; bool _use_cache;
...@@ -147,6 +172,8 @@ class ConcurrentG1Refine: public CHeapObj { ...@@ -147,6 +172,8 @@ class ConcurrentG1Refine: public CHeapObj {
void init(); // Accomplish some initialization that has to wait. void init(); // Accomplish some initialization that has to wait.
void stop(); void stop();
void reinitialize_threads();
// Iterate over the conc refine threads // Iterate over the conc refine threads
void threads_do(ThreadClosure *tc); void threads_do(ThreadClosure *tc);
...@@ -178,7 +205,20 @@ class ConcurrentG1Refine: public CHeapObj { ...@@ -178,7 +205,20 @@ class ConcurrentG1Refine: public CHeapObj {
void clear_and_record_card_counts(); void clear_and_record_card_counts();
static size_t thread_num(); static int thread_num();
void print_worker_threads_on(outputStream* st) const; void print_worker_threads_on(outputStream* st) const;
void set_green_zone(int x) { _green_zone = x; }
void set_yellow_zone(int x) { _yellow_zone = x; }
void set_red_zone(int x) { _red_zone = x; }
int green_zone() const { return _green_zone; }
int yellow_zone() const { return _yellow_zone; }
int red_zone() const { return _red_zone; }
int total_thread_num() const { return _n_threads; }
int worker_thread_num() const { return _n_worker_threads; }
int thread_threshold_step() const { return _thread_threshold_step; }
}; };
...@@ -25,10 +25,6 @@ ...@@ -25,10 +25,6 @@
#include "incls/_precompiled.incl" #include "incls/_precompiled.incl"
#include "incls/_concurrentG1RefineThread.cpp.incl" #include "incls/_concurrentG1RefineThread.cpp.incl"
// ======= Concurrent Mark Thread ========
// The CM thread is created when the G1 garbage collector is used
ConcurrentG1RefineThread:: ConcurrentG1RefineThread::
ConcurrentG1RefineThread(ConcurrentG1Refine* cg1r, ConcurrentG1RefineThread *next, ConcurrentG1RefineThread(ConcurrentG1Refine* cg1r, ConcurrentG1RefineThread *next,
int worker_id_offset, int worker_id) : int worker_id_offset, int worker_id) :
...@@ -37,19 +33,42 @@ ConcurrentG1RefineThread(ConcurrentG1Refine* cg1r, ConcurrentG1RefineThread *nex ...@@ -37,19 +33,42 @@ ConcurrentG1RefineThread(ConcurrentG1Refine* cg1r, ConcurrentG1RefineThread *nex
_worker_id(worker_id), _worker_id(worker_id),
_active(false), _active(false),
_next(next), _next(next),
_monitor(NULL),
_cg1r(cg1r), _cg1r(cg1r),
_vtime_accum(0.0), _vtime_accum(0.0)
_interval_ms(5.0)
{ {
// Each thread has its own monitor. The i-th thread is responsible for signalling
// to thread i+1 if the number of buffers in the queue exceeds a threashold for this
// thread. Monitors are also used to wake up the threads during termination.
// The 0th worker in notified by mutator threads and has a special monitor.
// The last worker is used for young gen rset size sampling.
if (worker_id > 0) {
_monitor = new Monitor(Mutex::nonleaf, "Refinement monitor", true);
} else {
_monitor = DirtyCardQ_CBL_mon;
}
initialize();
create_and_start(); create_and_start();
} }
void ConcurrentG1RefineThread::initialize() {
if (_worker_id < cg1r()->worker_thread_num()) {
// Current thread activation threshold
_threshold = MIN2<int>(cg1r()->thread_threshold_step() * (_worker_id + 1) + cg1r()->green_zone(),
cg1r()->yellow_zone());
// A thread deactivates once the number of buffer reached a deactivation threshold
_deactivation_threshold = MAX2<int>(_threshold - cg1r()->thread_threshold_step(), cg1r()->green_zone());
} else {
set_active(true);
}
}
void ConcurrentG1RefineThread::sample_young_list_rs_lengths() { void ConcurrentG1RefineThread::sample_young_list_rs_lengths() {
G1CollectedHeap* g1h = G1CollectedHeap::heap(); G1CollectedHeap* g1h = G1CollectedHeap::heap();
G1CollectorPolicy* g1p = g1h->g1_policy(); G1CollectorPolicy* g1p = g1h->g1_policy();
if (g1p->adaptive_young_list_length()) { if (g1p->adaptive_young_list_length()) {
int regions_visited = 0; int regions_visited = 0;
g1h->young_list_rs_length_sampling_init(); g1h->young_list_rs_length_sampling_init();
while (g1h->young_list_rs_length_sampling_more()) { while (g1h->young_list_rs_length_sampling_more()) {
g1h->young_list_rs_length_sampling_next(); g1h->young_list_rs_length_sampling_next();
...@@ -70,99 +89,121 @@ void ConcurrentG1RefineThread::sample_young_list_rs_lengths() { ...@@ -70,99 +89,121 @@ void ConcurrentG1RefineThread::sample_young_list_rs_lengths() {
} }
} }
void ConcurrentG1RefineThread::run_young_rs_sampling() {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
_vtime_start = os::elapsedVTime();
while(!_should_terminate) {
_sts.join();
sample_young_list_rs_lengths();
_sts.leave();
if (os::supports_vtime()) {
_vtime_accum = (os::elapsedVTime() - _vtime_start);
} else {
_vtime_accum = 0.0;
}
MutexLockerEx x(_monitor, Mutex::_no_safepoint_check_flag);
if (_should_terminate) {
break;
}
_monitor->wait(Mutex::_no_safepoint_check_flag, G1ConcRefineServiceInterval);
}
}
void ConcurrentG1RefineThread::wait_for_completed_buffers() {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
MutexLockerEx x(_monitor, Mutex::_no_safepoint_check_flag);
while (!_should_terminate && !is_active()) {
_monitor->wait(Mutex::_no_safepoint_check_flag);
}
}
bool ConcurrentG1RefineThread::is_active() {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
return _worker_id > 0 ? _active : dcqs.process_completed_buffers();
}
void ConcurrentG1RefineThread::activate() {
MutexLockerEx x(_monitor, Mutex::_no_safepoint_check_flag);
if (_worker_id > 0) {
if (G1TraceConcurrentRefinement) {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
gclog_or_tty->print_cr("G1-Refine-activated worker %d, on threshold %d, current %d",
_worker_id, _threshold, (int)dcqs.completed_buffers_num());
}
set_active(true);
} else {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
dcqs.set_process_completed(true);
}
_monitor->notify();
}
void ConcurrentG1RefineThread::deactivate() {
MutexLockerEx x(_monitor, Mutex::_no_safepoint_check_flag);
if (_worker_id > 0) {
if (G1TraceConcurrentRefinement) {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
gclog_or_tty->print_cr("G1-Refine-deactivated worker %d, off threshold %d, current %d",
_worker_id, _deactivation_threshold, (int)dcqs.completed_buffers_num());
}
set_active(false);
} else {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
dcqs.set_process_completed(false);
}
}
void ConcurrentG1RefineThread::run() { void ConcurrentG1RefineThread::run() {
initialize_in_thread(); initialize_in_thread();
_vtime_start = os::elapsedVTime();
wait_for_universe_init(); wait_for_universe_init();
if (_worker_id >= cg1r()->worker_thread_num()) {
run_young_rs_sampling();
terminate();
}
_vtime_start = os::elapsedVTime();
while (!_should_terminate) { while (!_should_terminate) {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set(); DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
// Wait for completed log buffers to exist.
{ // Wait for work
MutexLockerEx x(DirtyCardQ_CBL_mon, Mutex::_no_safepoint_check_flag); wait_for_completed_buffers();
while (((_worker_id == 0 && !dcqs.process_completed_buffers()) ||
(_worker_id > 0 && !is_active())) &&
!_should_terminate) {
DirtyCardQ_CBL_mon->wait(Mutex::_no_safepoint_check_flag);
}
}
if (_should_terminate) { if (_should_terminate) {
return; break;
} }
// Now we take them off (this doesn't hold locks while it applies
// closures.) (If we did a full collection, then we'll do a full
// traversal.
_sts.join(); _sts.join();
int n_logs = 0;
int lower_limit = 0; do {
double start_vtime_sec; // only used when G1SmoothConcRefine is on int curr_buffer_num = (int)dcqs.completed_buffers_num();
int prev_buffer_num; // only used when G1SmoothConcRefine is on // If the number of the buffers falls down into the yellow zone,
// This thread activation threshold // that means that the transition period after the evacuation pause has ended.
int threshold = G1UpdateBufferQueueProcessingThreshold * _worker_id; if (dcqs.completed_queue_padding() > 0 && curr_buffer_num <= cg1r()->yellow_zone()) {
// Next thread activation threshold dcqs.set_completed_queue_padding(0);
int next_threshold = threshold + G1UpdateBufferQueueProcessingThreshold;
int deactivation_threshold = MAX2<int>(threshold - G1UpdateBufferQueueProcessingThreshold / 2, 0);
if (G1SmoothConcRefine) {
lower_limit = 0;
start_vtime_sec = os::elapsedVTime();
prev_buffer_num = (int) dcqs.completed_buffers_num();
} else {
lower_limit = G1UpdateBufferQueueProcessingThreshold / 4; // For now.
}
while (dcqs.apply_closure_to_completed_buffer(_worker_id + _worker_id_offset, lower_limit)) {
double end_vtime_sec;
double elapsed_vtime_sec;
int elapsed_vtime_ms;
int curr_buffer_num = (int) dcqs.completed_buffers_num();
if (G1SmoothConcRefine) {
end_vtime_sec = os::elapsedVTime();
elapsed_vtime_sec = end_vtime_sec - start_vtime_sec;
elapsed_vtime_ms = (int) (elapsed_vtime_sec * 1000.0);
if (curr_buffer_num > prev_buffer_num ||
curr_buffer_num > next_threshold) {
decreaseInterval(elapsed_vtime_ms);
} else if (curr_buffer_num < prev_buffer_num) {
increaseInterval(elapsed_vtime_ms);
}
} }
if (_worker_id == 0) {
sample_young_list_rs_lengths(); if (_worker_id > 0 && curr_buffer_num <= _deactivation_threshold) {
} else if (curr_buffer_num < deactivation_threshold) {
// If the number of the buffer has fallen below our threshold // If the number of the buffer has fallen below our threshold
// we should deactivate. The predecessor will reactivate this // we should deactivate. The predecessor will reactivate this
// thread should the number of the buffers cross the threshold again. // thread should the number of the buffers cross the threshold again.
MutexLockerEx x(DirtyCardQ_CBL_mon, Mutex::_no_safepoint_check_flag);
deactivate(); deactivate();
if (G1TraceConcurrentRefinement) {
gclog_or_tty->print_cr("G1-Refine-deactivated worker %d", _worker_id);
}
break; break;
} }
// Check if we need to activate the next thread. // Check if we need to activate the next thread.
if (curr_buffer_num > next_threshold && _next != NULL && !_next->is_active()) { if (_next != NULL && !_next->is_active() && curr_buffer_num > _next->_threshold) {
MutexLockerEx x(DirtyCardQ_CBL_mon, Mutex::_no_safepoint_check_flag);
_next->activate(); _next->activate();
DirtyCardQ_CBL_mon->notify_all();
if (G1TraceConcurrentRefinement) {
gclog_or_tty->print_cr("G1-Refine-activated worker %d", _next->_worker_id);
}
} }
} while (dcqs.apply_closure_to_completed_buffer(_worker_id + _worker_id_offset, cg1r()->green_zone()));
if (G1SmoothConcRefine) { // We can exit the loop above while being active if there was a yield request.
prev_buffer_num = curr_buffer_num; if (is_active()) {
_sts.leave(); deactivate();
os::sleep(Thread::current(), (jlong) _interval_ms, false);
_sts.join();
start_vtime_sec = os::elapsedVTime();
}
n_logs++;
} }
_sts.leave(); _sts.leave();
if (os::supports_vtime()) { if (os::supports_vtime()) {
...@@ -172,7 +213,6 @@ void ConcurrentG1RefineThread::run() { ...@@ -172,7 +213,6 @@ void ConcurrentG1RefineThread::run() {
} }
} }
assert(_should_terminate, "just checking"); assert(_should_terminate, "just checking");
terminate(); terminate();
} }
...@@ -191,8 +231,8 @@ void ConcurrentG1RefineThread::stop() { ...@@ -191,8 +231,8 @@ void ConcurrentG1RefineThread::stop() {
} }
{ {
MutexLockerEx x(DirtyCardQ_CBL_mon, Mutex::_no_safepoint_check_flag); MutexLockerEx x(_monitor, Mutex::_no_safepoint_check_flag);
DirtyCardQ_CBL_mon->notify_all(); _monitor->notify();
} }
{ {
......
...@@ -40,42 +40,36 @@ class ConcurrentG1RefineThread: public ConcurrentGCThread { ...@@ -40,42 +40,36 @@ class ConcurrentG1RefineThread: public ConcurrentGCThread {
// when the number of the rset update buffer crosses a certain threshold. A successor // when the number of the rset update buffer crosses a certain threshold. A successor
// would self-deactivate when the number of the buffers falls below the threshold. // would self-deactivate when the number of the buffers falls below the threshold.
bool _active; bool _active;
ConcurrentG1RefineThread * _next; ConcurrentG1RefineThread* _next;
public: Monitor* _monitor;
virtual void run(); ConcurrentG1Refine* _cg1r;
bool is_active() { return _active; }
void activate() { _active = true; }
void deactivate() { _active = false; }
private: int _thread_threshold_step;
ConcurrentG1Refine* _cg1r; // This thread activation threshold
int _threshold;
// This thread deactivation threshold
int _deactivation_threshold;
double _interval_ms; void sample_young_list_rs_lengths();
void run_young_rs_sampling();
void wait_for_completed_buffers();
void decreaseInterval(int processing_time_ms) { void set_active(bool x) { _active = x; }
double min_interval_ms = (double) processing_time_ms; bool is_active();
_interval_ms = 0.8 * _interval_ms; void activate();
if (_interval_ms < min_interval_ms) void deactivate();
_interval_ms = min_interval_ms;
}
void increaseInterval(int processing_time_ms) {
double max_interval_ms = 9.0 * (double) processing_time_ms;
_interval_ms = 1.1 * _interval_ms;
if (max_interval_ms > 0 && _interval_ms > max_interval_ms)
_interval_ms = max_interval_ms;
}
void sleepBeforeNextCycle();
// For use by G1CollectedHeap, which is a friend. // For use by G1CollectedHeap, which is a friend.
static SuspendibleThreadSet* sts() { return &_sts; } static SuspendibleThreadSet* sts() { return &_sts; }
public: public:
virtual void run();
// Constructor // Constructor
ConcurrentG1RefineThread(ConcurrentG1Refine* cg1r, ConcurrentG1RefineThread* next, ConcurrentG1RefineThread(ConcurrentG1Refine* cg1r, ConcurrentG1RefineThread* next,
int worker_id_offset, int worker_id); int worker_id_offset, int worker_id);
void initialize();
// Printing // Printing
void print() const; void print() const;
void print_on(outputStream* st) const; void print_on(outputStream* st) const;
...@@ -83,13 +77,10 @@ class ConcurrentG1RefineThread: public ConcurrentGCThread { ...@@ -83,13 +77,10 @@ class ConcurrentG1RefineThread: public ConcurrentGCThread {
// Total virtual time so far. // Total virtual time so far.
double vtime_accum() { return _vtime_accum; } double vtime_accum() { return _vtime_accum; }
ConcurrentG1Refine* cg1r() { return _cg1r; } ConcurrentG1Refine* cg1r() { return _cg1r; }
void sample_young_list_rs_lengths();
// Yield for GC // Yield for GC
void yield(); void yield();
// shutdown // shutdown
void stop(); void stop();
}; };
...@@ -760,7 +760,6 @@ void ConcurrentMark::checkpointRootsInitialPost() { ...@@ -760,7 +760,6 @@ void ConcurrentMark::checkpointRootsInitialPost() {
rp->setup_policy(false); // snapshot the soft ref policy to be used in this cycle rp->setup_policy(false); // snapshot the soft ref policy to be used in this cycle
SATBMarkQueueSet& satb_mq_set = JavaThread::satb_mark_queue_set(); SATBMarkQueueSet& satb_mq_set = JavaThread::satb_mark_queue_set();
satb_mq_set.set_process_completed_threshold(G1SATBProcessCompletedThreshold);
satb_mq_set.set_active_all_threads(true); satb_mq_set.set_active_all_threads(true);
// update_g1_committed() will be called at the end of an evac pause // update_g1_committed() will be called at the end of an evac pause
......
...@@ -61,8 +61,8 @@ bool DirtyCardQueue::apply_closure_to_buffer(CardTableEntryClosure* cl, ...@@ -61,8 +61,8 @@ bool DirtyCardQueue::apply_closure_to_buffer(CardTableEntryClosure* cl,
#pragma warning( disable:4355 ) // 'this' : used in base member initializer list #pragma warning( disable:4355 ) // 'this' : used in base member initializer list
#endif // _MSC_VER #endif // _MSC_VER
DirtyCardQueueSet::DirtyCardQueueSet() : DirtyCardQueueSet::DirtyCardQueueSet(bool notify_when_complete) :
PtrQueueSet(true /*notify_when_complete*/), PtrQueueSet(notify_when_complete),
_closure(NULL), _closure(NULL),
_shared_dirty_card_queue(this, true /*perm*/), _shared_dirty_card_queue(this, true /*perm*/),
_free_ids(NULL), _free_ids(NULL),
...@@ -77,12 +77,12 @@ size_t DirtyCardQueueSet::num_par_ids() { ...@@ -77,12 +77,12 @@ size_t DirtyCardQueueSet::num_par_ids() {
} }
void DirtyCardQueueSet::initialize(Monitor* cbl_mon, Mutex* fl_lock, void DirtyCardQueueSet::initialize(Monitor* cbl_mon, Mutex* fl_lock,
int process_completed_threshold,
int max_completed_queue, int max_completed_queue,
Mutex* lock, PtrQueueSet* fl_owner) { Mutex* lock, PtrQueueSet* fl_owner) {
PtrQueueSet::initialize(cbl_mon, fl_lock, max_completed_queue, fl_owner); PtrQueueSet::initialize(cbl_mon, fl_lock, process_completed_threshold,
max_completed_queue, fl_owner);
set_buffer_size(G1UpdateBufferSize); set_buffer_size(G1UpdateBufferSize);
set_process_completed_threshold(G1UpdateBufferQueueProcessingThreshold);
_shared_dirty_card_queue.set_lock(lock); _shared_dirty_card_queue.set_lock(lock);
_free_ids = new FreeIdSet((int) num_par_ids(), _cbl_mon); _free_ids = new FreeIdSet((int) num_par_ids(), _cbl_mon);
} }
...@@ -154,9 +154,10 @@ bool DirtyCardQueueSet::mut_process_buffer(void** buf) { ...@@ -154,9 +154,10 @@ bool DirtyCardQueueSet::mut_process_buffer(void** buf) {
return b; return b;
} }
DirtyCardQueueSet::CompletedBufferNode*
BufferNode*
DirtyCardQueueSet::get_completed_buffer(int stop_at) { DirtyCardQueueSet::get_completed_buffer(int stop_at) {
CompletedBufferNode* nd = NULL; BufferNode* nd = NULL;
MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag);
if ((int)_n_completed_buffers <= stop_at) { if ((int)_n_completed_buffers <= stop_at) {
...@@ -166,10 +167,11 @@ DirtyCardQueueSet::get_completed_buffer(int stop_at) { ...@@ -166,10 +167,11 @@ DirtyCardQueueSet::get_completed_buffer(int stop_at) {
if (_completed_buffers_head != NULL) { if (_completed_buffers_head != NULL) {
nd = _completed_buffers_head; nd = _completed_buffers_head;
_completed_buffers_head = nd->next; _completed_buffers_head = nd->next();
if (_completed_buffers_head == NULL) if (_completed_buffers_head == NULL)
_completed_buffers_tail = NULL; _completed_buffers_tail = NULL;
_n_completed_buffers--; _n_completed_buffers--;
assert(_n_completed_buffers >= 0, "Invariant");
} }
debug_only(assert_completed_buffer_list_len_correct_locked()); debug_only(assert_completed_buffer_list_len_correct_locked());
return nd; return nd;
...@@ -177,20 +179,19 @@ DirtyCardQueueSet::get_completed_buffer(int stop_at) { ...@@ -177,20 +179,19 @@ DirtyCardQueueSet::get_completed_buffer(int stop_at) {
bool DirtyCardQueueSet:: bool DirtyCardQueueSet::
apply_closure_to_completed_buffer_helper(int worker_i, apply_closure_to_completed_buffer_helper(int worker_i,
CompletedBufferNode* nd) { BufferNode* nd) {
if (nd != NULL) { if (nd != NULL) {
void **buf = BufferNode::make_buffer_from_node(nd);
size_t index = nd->index();
bool b = bool b =
DirtyCardQueue::apply_closure_to_buffer(_closure, nd->buf, DirtyCardQueue::apply_closure_to_buffer(_closure, buf,
nd->index, _sz, index, _sz,
true, worker_i); true, worker_i);
void** buf = nd->buf;
size_t index = nd->index;
delete nd;
if (b) { if (b) {
deallocate_buffer(buf); deallocate_buffer(buf);
return true; // In normal case, go on to next buffer. return true; // In normal case, go on to next buffer.
} else { } else {
enqueue_complete_buffer(buf, index, true); enqueue_complete_buffer(buf, index);
return false; return false;
} }
} else { } else {
...@@ -203,32 +204,33 @@ bool DirtyCardQueueSet::apply_closure_to_completed_buffer(int worker_i, ...@@ -203,32 +204,33 @@ bool DirtyCardQueueSet::apply_closure_to_completed_buffer(int worker_i,
bool during_pause) bool during_pause)
{ {
assert(!during_pause || stop_at == 0, "Should not leave any completed buffers during a pause"); assert(!during_pause || stop_at == 0, "Should not leave any completed buffers during a pause");
CompletedBufferNode* nd = get_completed_buffer(stop_at); BufferNode* nd = get_completed_buffer(stop_at);
bool res = apply_closure_to_completed_buffer_helper(worker_i, nd); bool res = apply_closure_to_completed_buffer_helper(worker_i, nd);
if (res) Atomic::inc(&_processed_buffers_rs_thread); if (res) Atomic::inc(&_processed_buffers_rs_thread);
return res; return res;
} }
void DirtyCardQueueSet::apply_closure_to_all_completed_buffers() { void DirtyCardQueueSet::apply_closure_to_all_completed_buffers() {
CompletedBufferNode* nd = _completed_buffers_head; BufferNode* nd = _completed_buffers_head;
while (nd != NULL) { while (nd != NULL) {
bool b = bool b =
DirtyCardQueue::apply_closure_to_buffer(_closure, nd->buf, 0, _sz, DirtyCardQueue::apply_closure_to_buffer(_closure,
false); BufferNode::make_buffer_from_node(nd),
0, _sz, false);
guarantee(b, "Should not stop early."); guarantee(b, "Should not stop early.");
nd = nd->next; nd = nd->next();
} }
} }
void DirtyCardQueueSet::abandon_logs() { void DirtyCardQueueSet::abandon_logs() {
assert(SafepointSynchronize::is_at_safepoint(), "Must be at safepoint."); assert(SafepointSynchronize::is_at_safepoint(), "Must be at safepoint.");
CompletedBufferNode* buffers_to_delete = NULL; BufferNode* buffers_to_delete = NULL;
{ {
MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag);
while (_completed_buffers_head != NULL) { while (_completed_buffers_head != NULL) {
CompletedBufferNode* nd = _completed_buffers_head; BufferNode* nd = _completed_buffers_head;
_completed_buffers_head = nd->next; _completed_buffers_head = nd->next();
nd->next = buffers_to_delete; nd->set_next(buffers_to_delete);
buffers_to_delete = nd; buffers_to_delete = nd;
} }
_n_completed_buffers = 0; _n_completed_buffers = 0;
...@@ -236,10 +238,9 @@ void DirtyCardQueueSet::abandon_logs() { ...@@ -236,10 +238,9 @@ void DirtyCardQueueSet::abandon_logs() {
debug_only(assert_completed_buffer_list_len_correct_locked()); debug_only(assert_completed_buffer_list_len_correct_locked());
} }
while (buffers_to_delete != NULL) { while (buffers_to_delete != NULL) {
CompletedBufferNode* nd = buffers_to_delete; BufferNode* nd = buffers_to_delete;
buffers_to_delete = nd->next; buffers_to_delete = nd->next();
deallocate_buffer(nd->buf); deallocate_buffer(BufferNode::make_buffer_from_node(nd));
delete nd;
} }
// Since abandon is done only at safepoints, we can safely manipulate // Since abandon is done only at safepoints, we can safely manipulate
// these queues. // these queues.
......
...@@ -84,11 +84,12 @@ class DirtyCardQueueSet: public PtrQueueSet { ...@@ -84,11 +84,12 @@ class DirtyCardQueueSet: public PtrQueueSet {
jint _processed_buffers_rs_thread; jint _processed_buffers_rs_thread;
public: public:
DirtyCardQueueSet(); DirtyCardQueueSet(bool notify_when_complete = true);
void initialize(Monitor* cbl_mon, Mutex* fl_lock, void initialize(Monitor* cbl_mon, Mutex* fl_lock,
int max_completed_queue = 0, int process_completed_threshold,
Mutex* lock = NULL, PtrQueueSet* fl_owner = NULL); int max_completed_queue,
Mutex* lock, PtrQueueSet* fl_owner = NULL);
// The number of parallel ids that can be claimed to allow collector or // The number of parallel ids that can be claimed to allow collector or
// mutator threads to do card-processing work. // mutator threads to do card-processing work.
...@@ -123,9 +124,9 @@ public: ...@@ -123,9 +124,9 @@ public:
bool during_pause = false); bool during_pause = false);
bool apply_closure_to_completed_buffer_helper(int worker_i, bool apply_closure_to_completed_buffer_helper(int worker_i,
CompletedBufferNode* nd); BufferNode* nd);
CompletedBufferNode* get_completed_buffer(int stop_at); BufferNode* get_completed_buffer(int stop_at);
// Applies the current closure to all completed buffers, // Applies the current closure to all completed buffers,
// non-consumptively. // non-consumptively.
......
...@@ -1375,6 +1375,7 @@ void G1CollectedHeap::shrink(size_t shrink_bytes) { ...@@ -1375,6 +1375,7 @@ void G1CollectedHeap::shrink(size_t shrink_bytes) {
G1CollectedHeap::G1CollectedHeap(G1CollectorPolicy* policy_) : G1CollectedHeap::G1CollectedHeap(G1CollectorPolicy* policy_) :
SharedHeap(policy_), SharedHeap(policy_),
_g1_policy(policy_), _g1_policy(policy_),
_dirty_card_queue_set(false),
_ref_processor(NULL), _ref_processor(NULL),
_process_strong_tasks(new SubTasksDone(G1H_PS_NumElements)), _process_strong_tasks(new SubTasksDone(G1H_PS_NumElements)),
_bot_shared(NULL), _bot_shared(NULL),
...@@ -1460,8 +1461,6 @@ jint G1CollectedHeap::initialize() { ...@@ -1460,8 +1461,6 @@ jint G1CollectedHeap::initialize() {
Universe::check_alignment(init_byte_size, HeapRegion::GrainBytes, "g1 heap"); Universe::check_alignment(init_byte_size, HeapRegion::GrainBytes, "g1 heap");
Universe::check_alignment(max_byte_size, HeapRegion::GrainBytes, "g1 heap"); Universe::check_alignment(max_byte_size, HeapRegion::GrainBytes, "g1 heap");
// We allocate this in any case, but only do no work if the command line
// param is off.
_cg1r = new ConcurrentG1Refine(); _cg1r = new ConcurrentG1Refine();
// Reserve the maximum. // Reserve the maximum.
...@@ -1594,18 +1593,20 @@ jint G1CollectedHeap::initialize() { ...@@ -1594,18 +1593,20 @@ jint G1CollectedHeap::initialize() {
JavaThread::satb_mark_queue_set().initialize(SATB_Q_CBL_mon, JavaThread::satb_mark_queue_set().initialize(SATB_Q_CBL_mon,
SATB_Q_FL_lock, SATB_Q_FL_lock,
0, G1SATBProcessCompletedThreshold,
Shared_SATB_Q_lock); Shared_SATB_Q_lock);
JavaThread::dirty_card_queue_set().initialize(DirtyCardQ_CBL_mon, JavaThread::dirty_card_queue_set().initialize(DirtyCardQ_CBL_mon,
DirtyCardQ_FL_lock, DirtyCardQ_FL_lock,
G1UpdateBufferQueueMaxLength, concurrent_g1_refine()->yellow_zone(),
concurrent_g1_refine()->red_zone(),
Shared_DirtyCardQ_lock); Shared_DirtyCardQ_lock);
if (G1DeferredRSUpdate) { if (G1DeferredRSUpdate) {
dirty_card_queue_set().initialize(DirtyCardQ_CBL_mon, dirty_card_queue_set().initialize(DirtyCardQ_CBL_mon,
DirtyCardQ_FL_lock, DirtyCardQ_FL_lock,
0, -1, // never trigger processing
-1, // no limit on length
Shared_DirtyCardQ_lock, Shared_DirtyCardQ_lock,
&JavaThread::dirty_card_queue_set()); &JavaThread::dirty_card_queue_set());
} }
...@@ -4239,10 +4240,11 @@ void G1CollectedHeap::evacuate_collection_set() { ...@@ -4239,10 +4240,11 @@ void G1CollectedHeap::evacuate_collection_set() {
RedirtyLoggedCardTableEntryFastClosure redirty; RedirtyLoggedCardTableEntryFastClosure redirty;
dirty_card_queue_set().set_closure(&redirty); dirty_card_queue_set().set_closure(&redirty);
dirty_card_queue_set().apply_closure_to_all_completed_buffers(); dirty_card_queue_set().apply_closure_to_all_completed_buffers();
JavaThread::dirty_card_queue_set().merge_bufferlists(&dirty_card_queue_set());
DirtyCardQueueSet& dcq = JavaThread::dirty_card_queue_set();
dcq.merge_bufferlists(&dirty_card_queue_set());
assert(dirty_card_queue_set().completed_buffers_num() == 0, "All should be consumed"); assert(dirty_card_queue_set().completed_buffers_num() == 0, "All should be consumed");
} }
COMPILER2_PRESENT(DerivedPointerTable::update_pointers()); COMPILER2_PRESENT(DerivedPointerTable::update_pointers());
} }
......
...@@ -1914,6 +1914,10 @@ void G1CollectorPolicy::record_collection_pause_end(bool abandoned) { ...@@ -1914,6 +1914,10 @@ void G1CollectorPolicy::record_collection_pause_end(bool abandoned) {
calculate_young_list_min_length(); calculate_young_list_min_length();
calculate_young_list_target_config(); calculate_young_list_target_config();
// Note that _mmu_tracker->max_gc_time() returns the time in seconds.
double update_rs_time_goal_ms = _mmu_tracker->max_gc_time() * MILLIUNITS * G1RSUpdatePauseFractionPercent / 100.0;
adjust_concurrent_refinement(update_rs_time, update_rs_processed_buffers, update_rs_time_goal_ms);
// </NEW PREDICTION> // </NEW PREDICTION>
_target_pause_time_ms = -1.0; _target_pause_time_ms = -1.0;
...@@ -1921,6 +1925,47 @@ void G1CollectorPolicy::record_collection_pause_end(bool abandoned) { ...@@ -1921,6 +1925,47 @@ void G1CollectorPolicy::record_collection_pause_end(bool abandoned) {
// <NEW PREDICTION> // <NEW PREDICTION>
void G1CollectorPolicy::adjust_concurrent_refinement(double update_rs_time,
double update_rs_processed_buffers,
double goal_ms) {
DirtyCardQueueSet& dcqs = JavaThread::dirty_card_queue_set();
ConcurrentG1Refine *cg1r = G1CollectedHeap::heap()->concurrent_g1_refine();
if (G1AdaptiveConcRefine) {
const int k_gy = 3, k_gr = 6;
const double inc_k = 1.1, dec_k = 0.9;
int g = cg1r->green_zone();
if (update_rs_time > goal_ms) {
g = (int)(g * dec_k); // Can become 0, that's OK. That would mean a mutator-only processing.
} else {
if (update_rs_time < goal_ms && update_rs_processed_buffers > g) {
g = (int)MAX2(g * inc_k, g + 1.0);
}
}
// Change the refinement threads params
cg1r->set_green_zone(g);
cg1r->set_yellow_zone(g * k_gy);
cg1r->set_red_zone(g * k_gr);
cg1r->reinitialize_threads();
int processing_threshold_delta = MAX2((int)(cg1r->green_zone() * sigma()), 1);
int processing_threshold = MIN2(cg1r->green_zone() + processing_threshold_delta,
cg1r->yellow_zone());
// Change the barrier params
dcqs.set_process_completed_threshold(processing_threshold);
dcqs.set_max_completed_queue(cg1r->red_zone());
}
int curr_queue_size = dcqs.completed_buffers_num();
if (curr_queue_size >= cg1r->yellow_zone()) {
dcqs.set_completed_queue_padding(curr_queue_size);
} else {
dcqs.set_completed_queue_padding(0);
}
dcqs.notify_if_necessary();
}
double double
G1CollectorPolicy:: G1CollectorPolicy::
predict_young_collection_elapsed_time_ms(size_t adjustment) { predict_young_collection_elapsed_time_ms(size_t adjustment) {
......
...@@ -316,6 +316,10 @@ private: ...@@ -316,6 +316,10 @@ private:
bool verify_young_ages(HeapRegion* head, SurvRateGroup *surv_rate_group); bool verify_young_ages(HeapRegion* head, SurvRateGroup *surv_rate_group);
#endif // PRODUCT #endif // PRODUCT
void adjust_concurrent_refinement(double update_rs_time,
double update_rs_processed_buffers,
double goal_ms);
protected: protected:
double _pause_time_target_ms; double _pause_time_target_ms;
double _recorded_young_cset_choice_time_ms; double _recorded_young_cset_choice_time_ms;
......
...@@ -85,7 +85,7 @@ ...@@ -85,7 +85,7 @@
diagnostic(bool, G1SummarizeZFStats, false, \ diagnostic(bool, G1SummarizeZFStats, false, \
"Summarize zero-filling info") \ "Summarize zero-filling info") \
\ \
develop(bool, G1TraceConcurrentRefinement, false, \ diagnostic(bool, G1TraceConcurrentRefinement, false, \
"Trace G1 concurrent refinement") \ "Trace G1 concurrent refinement") \
\ \
product(intx, G1MarkStackSize, 2 * 1024 * 1024, \ product(intx, G1MarkStackSize, 2 * 1024 * 1024, \
...@@ -94,19 +94,6 @@ ...@@ -94,19 +94,6 @@
product(intx, G1MarkRegionStackSize, 1024 * 1024, \ product(intx, G1MarkRegionStackSize, 1024 * 1024, \
"Size of the region stack for concurrent marking.") \ "Size of the region stack for concurrent marking.") \
\ \
develop(bool, G1ConcRefine, true, \
"If true, run concurrent rem set refinement for G1") \
\
develop(intx, G1ConcRefineTargTraversals, 4, \
"Number of concurrent refinement we try to achieve") \
\
develop(intx, G1ConcRefineInitialDelta, 4, \
"Number of heap regions of alloc ahead of starting collection " \
"pause to start concurrent refinement (initially)") \
\
develop(bool, G1SmoothConcRefine, true, \
"Attempts to smooth out the overhead of concurrent refinement") \
\
develop(bool, G1ConcZeroFill, true, \ develop(bool, G1ConcZeroFill, true, \
"If true, run concurrent zero-filling thread") \ "If true, run concurrent zero-filling thread") \
\ \
...@@ -178,13 +165,38 @@ ...@@ -178,13 +165,38 @@
product(intx, G1UpdateBufferSize, 256, \ product(intx, G1UpdateBufferSize, 256, \
"Size of an update buffer") \ "Size of an update buffer") \
\ \
product(intx, G1UpdateBufferQueueProcessingThreshold, 5, \ product(intx, G1ConcRefineYellowZone, 0, \
"Number of enqueued update buffers that will " \ "Number of enqueued update buffers that will " \
"trigger concurrent processing") \ "trigger concurrent processing. Will be selected ergonomically " \
"by default.") \
\ \
product(intx, G1UpdateBufferQueueMaxLength, 30, \ product(intx, G1ConcRefineRedZone, 0, \
"Maximum number of enqueued update buffers before mutator " \ "Maximum number of enqueued update buffers before mutator " \
"threads start processing new ones instead of enqueueing them") \ "threads start processing new ones instead of enqueueing them. " \
"Will be selected ergonomically by default. Zero will disable " \
"concurrent processing.") \
\
product(intx, G1ConcRefineGreenZone, 0, \
"The number of update buffers that are left in the queue by the " \
"concurrent processing threads. Will be selected ergonomically " \
"by default.") \
\
product(intx, G1ConcRefineServiceInterval, 300, \
"The last concurrent refinement thread wakes up every " \
"specified number of milliseconds to do miscellaneous work.") \
\
product(intx, G1ConcRefineThresholdStep, 0, \
"Each time the rset update queue increases by this amount " \
"activate the next refinement thread if available. " \
"Will be selected ergonomically by default.") \
\
product(intx, G1RSUpdatePauseFractionPercent, 10, \
"A target percentage of time that is allowed to be spend on " \
"process RS update buffers during the collection pause.") \
\
product(bool, G1AdaptiveConcRefine, true, \
"Select green, yellow and red zones adaptively to meet the " \
"the pause requirements.") \
\ \
develop(intx, G1ConcRSLogCacheSize, 10, \ develop(intx, G1ConcRSLogCacheSize, 10, \
"Log base 2 of the length of conc RS hot-card cache.") \ "Log base 2 of the length of conc RS hot-card cache.") \
......
...@@ -64,8 +64,8 @@ void PtrQueue::enqueue_known_active(void* ptr) { ...@@ -64,8 +64,8 @@ void PtrQueue::enqueue_known_active(void* ptr) {
while (_index == 0) { while (_index == 0) {
handle_zero_index(); handle_zero_index();
} }
assert(_index > 0, "postcondition");
assert(_index > 0, "postcondition");
_index -= oopSize; _index -= oopSize;
_buf[byte_index_to_index((int)_index)] = ptr; _buf[byte_index_to_index((int)_index)] = ptr;
assert(0 <= _index && _index <= _sz, "Invariant."); assert(0 <= _index && _index <= _sz, "Invariant.");
...@@ -99,95 +99,110 @@ void** PtrQueueSet::allocate_buffer() { ...@@ -99,95 +99,110 @@ void** PtrQueueSet::allocate_buffer() {
assert(_sz > 0, "Didn't set a buffer size."); assert(_sz > 0, "Didn't set a buffer size.");
MutexLockerEx x(_fl_owner->_fl_lock, Mutex::_no_safepoint_check_flag); MutexLockerEx x(_fl_owner->_fl_lock, Mutex::_no_safepoint_check_flag);
if (_fl_owner->_buf_free_list != NULL) { if (_fl_owner->_buf_free_list != NULL) {
void** res = _fl_owner->_buf_free_list; void** res = BufferNode::make_buffer_from_node(_fl_owner->_buf_free_list);
_fl_owner->_buf_free_list = (void**)_fl_owner->_buf_free_list[0]; _fl_owner->_buf_free_list = _fl_owner->_buf_free_list->next();
_fl_owner->_buf_free_list_sz--; _fl_owner->_buf_free_list_sz--;
// Just override the next pointer with NULL, just in case we scan this part
// of the buffer.
res[0] = NULL;
return res; return res;
} else { } else {
return (void**) NEW_C_HEAP_ARRAY(char, _sz); // Allocate space for the BufferNode in front of the buffer.
char *b = NEW_C_HEAP_ARRAY(char, _sz + BufferNode::aligned_size());
return BufferNode::make_buffer_from_block(b);
} }
} }
void PtrQueueSet::deallocate_buffer(void** buf) { void PtrQueueSet::deallocate_buffer(void** buf) {
assert(_sz > 0, "Didn't set a buffer size."); assert(_sz > 0, "Didn't set a buffer size.");
MutexLockerEx x(_fl_owner->_fl_lock, Mutex::_no_safepoint_check_flag); MutexLockerEx x(_fl_owner->_fl_lock, Mutex::_no_safepoint_check_flag);
buf[0] = (void*)_fl_owner->_buf_free_list; BufferNode *node = BufferNode::make_node_from_buffer(buf);
_fl_owner->_buf_free_list = buf; node->set_next(_fl_owner->_buf_free_list);
_fl_owner->_buf_free_list = node;
_fl_owner->_buf_free_list_sz++; _fl_owner->_buf_free_list_sz++;
} }
void PtrQueueSet::reduce_free_list() { void PtrQueueSet::reduce_free_list() {
assert(_fl_owner == this, "Free list reduction is allowed only for the owner");
// For now we'll adopt the strategy of deleting half. // For now we'll adopt the strategy of deleting half.
MutexLockerEx x(_fl_lock, Mutex::_no_safepoint_check_flag); MutexLockerEx x(_fl_lock, Mutex::_no_safepoint_check_flag);
size_t n = _buf_free_list_sz / 2; size_t n = _buf_free_list_sz / 2;
while (n > 0) { while (n > 0) {
assert(_buf_free_list != NULL, "_buf_free_list_sz must be wrong."); assert(_buf_free_list != NULL, "_buf_free_list_sz must be wrong.");
void** head = _buf_free_list; void* b = BufferNode::make_block_from_node(_buf_free_list);
_buf_free_list = (void**)_buf_free_list[0]; _buf_free_list = _buf_free_list->next();
FREE_C_HEAP_ARRAY(char, head); FREE_C_HEAP_ARRAY(char, b);
_buf_free_list_sz --; _buf_free_list_sz --;
n--; n--;
} }
} }
void PtrQueueSet::enqueue_complete_buffer(void** buf, size_t index, bool ignore_max_completed) { void PtrQueue::handle_zero_index() {
// I use explicit locking here because there's a bailout in the middle. assert(0 == _index, "Precondition.");
_cbl_mon->lock_without_safepoint_check(); // This thread records the full buffer and allocates a new one (while
// holding the lock if there is one).
Thread* thread = Thread::current(); if (_buf != NULL) {
assert( ignore_max_completed || if (_lock) {
thread->is_Java_thread() || locking_enqueue_completed_buffer(_buf);
SafepointSynchronize::is_at_safepoint(), } else {
"invariant" ); if (qset()->process_or_enqueue_complete_buffer(_buf)) {
ignore_max_completed = ignore_max_completed || !thread->is_Java_thread(); // Recycle the buffer. No allocation.
_sz = qset()->buffer_size();
if (!ignore_max_completed && _max_completed_queue > 0 && _index = _sz;
_n_completed_buffers >= (size_t) _max_completed_queue) { return;
_cbl_mon->unlock(); }
bool b = mut_process_buffer(buf);
if (b) {
deallocate_buffer(buf);
return;
} }
}
// Reallocate the buffer
_buf = qset()->allocate_buffer();
_sz = qset()->buffer_size();
_index = _sz;
assert(0 <= _index && _index <= _sz, "Invariant.");
}
// Otherwise, go ahead and enqueue the buffer. Must reaquire the lock. bool PtrQueueSet::process_or_enqueue_complete_buffer(void** buf) {
_cbl_mon->lock_without_safepoint_check(); if (Thread::current()->is_Java_thread()) {
// We don't lock. It is fine to be epsilon-precise here.
if (_max_completed_queue == 0 || _max_completed_queue > 0 &&
_n_completed_buffers >= _max_completed_queue + _completed_queue_padding) {
bool b = mut_process_buffer(buf);
if (b) {
// True here means that the buffer hasn't been deallocated and the caller may reuse it.
return true;
}
}
} }
// The buffer will be enqueued. The caller will have to get a new one.
enqueue_complete_buffer(buf);
return false;
}
// Here we still hold the _cbl_mon. void PtrQueueSet::enqueue_complete_buffer(void** buf, size_t index) {
CompletedBufferNode* cbn = new CompletedBufferNode; MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag);
cbn->buf = buf; BufferNode* cbn = BufferNode::new_from_buffer(buf);
cbn->next = NULL; cbn->set_index(index);
cbn->index = index;
if (_completed_buffers_tail == NULL) { if (_completed_buffers_tail == NULL) {
assert(_completed_buffers_head == NULL, "Well-formedness"); assert(_completed_buffers_head == NULL, "Well-formedness");
_completed_buffers_head = cbn; _completed_buffers_head = cbn;
_completed_buffers_tail = cbn; _completed_buffers_tail = cbn;
} else { } else {
_completed_buffers_tail->next = cbn; _completed_buffers_tail->set_next(cbn);
_completed_buffers_tail = cbn; _completed_buffers_tail = cbn;
} }
_n_completed_buffers++; _n_completed_buffers++;
if (!_process_completed && if (!_process_completed && _process_completed_threshold >= 0 &&
_n_completed_buffers >= _process_completed_threshold) { _n_completed_buffers >= _process_completed_threshold) {
_process_completed = true; _process_completed = true;
if (_notify_when_complete) if (_notify_when_complete)
_cbl_mon->notify_all(); _cbl_mon->notify();
} }
debug_only(assert_completed_buffer_list_len_correct_locked()); debug_only(assert_completed_buffer_list_len_correct_locked());
_cbl_mon->unlock();
} }
int PtrQueueSet::completed_buffers_list_length() { int PtrQueueSet::completed_buffers_list_length() {
int n = 0; int n = 0;
CompletedBufferNode* cbn = _completed_buffers_head; BufferNode* cbn = _completed_buffers_head;
while (cbn != NULL) { while (cbn != NULL) {
n++; n++;
cbn = cbn->next; cbn = cbn->next();
} }
return n; return n;
} }
...@@ -198,7 +213,7 @@ void PtrQueueSet::assert_completed_buffer_list_len_correct() { ...@@ -198,7 +213,7 @@ void PtrQueueSet::assert_completed_buffer_list_len_correct() {
} }
void PtrQueueSet::assert_completed_buffer_list_len_correct_locked() { void PtrQueueSet::assert_completed_buffer_list_len_correct_locked() {
guarantee((size_t)completed_buffers_list_length() == _n_completed_buffers, guarantee(completed_buffers_list_length() == _n_completed_buffers,
"Completed buffer length is wrong."); "Completed buffer length is wrong.");
} }
...@@ -207,12 +222,8 @@ void PtrQueueSet::set_buffer_size(size_t sz) { ...@@ -207,12 +222,8 @@ void PtrQueueSet::set_buffer_size(size_t sz) {
_sz = sz * oopSize; _sz = sz * oopSize;
} }
void PtrQueueSet::set_process_completed_threshold(size_t sz) { // Merge lists of buffers. Notify the processing threads.
_process_completed_threshold = sz; // The source queue is emptied as a result. The queues
}
// Merge lists of buffers. Notify waiting threads if the length of the list
// exceeds threshold. The source queue is emptied as a result. The queues
// must share the monitor. // must share the monitor.
void PtrQueueSet::merge_bufferlists(PtrQueueSet *src) { void PtrQueueSet::merge_bufferlists(PtrQueueSet *src) {
assert(_cbl_mon == src->_cbl_mon, "Should share the same lock"); assert(_cbl_mon == src->_cbl_mon, "Should share the same lock");
...@@ -224,7 +235,7 @@ void PtrQueueSet::merge_bufferlists(PtrQueueSet *src) { ...@@ -224,7 +235,7 @@ void PtrQueueSet::merge_bufferlists(PtrQueueSet *src) {
} else { } else {
assert(_completed_buffers_head != NULL, "Well formedness"); assert(_completed_buffers_head != NULL, "Well formedness");
if (src->_completed_buffers_head != NULL) { if (src->_completed_buffers_head != NULL) {
_completed_buffers_tail->next = src->_completed_buffers_head; _completed_buffers_tail->set_next(src->_completed_buffers_head);
_completed_buffers_tail = src->_completed_buffers_tail; _completed_buffers_tail = src->_completed_buffers_tail;
} }
} }
...@@ -237,31 +248,13 @@ void PtrQueueSet::merge_bufferlists(PtrQueueSet *src) { ...@@ -237,31 +248,13 @@ void PtrQueueSet::merge_bufferlists(PtrQueueSet *src) {
assert(_completed_buffers_head == NULL && _completed_buffers_tail == NULL || assert(_completed_buffers_head == NULL && _completed_buffers_tail == NULL ||
_completed_buffers_head != NULL && _completed_buffers_tail != NULL, _completed_buffers_head != NULL && _completed_buffers_tail != NULL,
"Sanity"); "Sanity");
}
if (!_process_completed && void PtrQueueSet::notify_if_necessary() {
_n_completed_buffers >= _process_completed_threshold) { MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag);
if (_n_completed_buffers >= _process_completed_threshold || _max_completed_queue == 0) {
_process_completed = true; _process_completed = true;
if (_notify_when_complete) if (_notify_when_complete)
_cbl_mon->notify_all(); _cbl_mon->notify();
}
}
// Merge free lists of the two queues. The free list of the source
// queue is emptied as a result. The queues must share the same
// mutex that guards free lists.
void PtrQueueSet::merge_freelists(PtrQueueSet* src) {
assert(_fl_lock == src->_fl_lock, "Should share the same lock");
MutexLockerEx x(_fl_lock, Mutex::_no_safepoint_check_flag);
if (_buf_free_list != NULL) {
void **p = _buf_free_list;
while (*p != NULL) {
p = (void**)*p;
}
*p = src->_buf_free_list;
} else {
_buf_free_list = src->_buf_free_list;
} }
_buf_free_list_sz += src->_buf_free_list_sz;
src->_buf_free_list = NULL;
src->_buf_free_list_sz = 0;
} }
...@@ -27,8 +27,10 @@ ...@@ -27,8 +27,10 @@
// the addresses of modified old-generation objects. This type supports // the addresses of modified old-generation objects. This type supports
// this operation. // this operation.
class PtrQueueSet; // The definition of placement operator new(size_t, void*) in the <new>.
#include <new>
class PtrQueueSet;
class PtrQueue VALUE_OBJ_CLASS_SPEC { class PtrQueue VALUE_OBJ_CLASS_SPEC {
protected: protected:
...@@ -77,7 +79,7 @@ public: ...@@ -77,7 +79,7 @@ public:
else enqueue_known_active(ptr); else enqueue_known_active(ptr);
} }
inline void handle_zero_index(); void handle_zero_index();
void locking_enqueue_completed_buffer(void** buf); void locking_enqueue_completed_buffer(void** buf);
void enqueue_known_active(void* ptr); void enqueue_known_active(void* ptr);
...@@ -126,34 +128,65 @@ public: ...@@ -126,34 +128,65 @@ public:
}; };
class BufferNode {
size_t _index;
BufferNode* _next;
public:
BufferNode() : _index(0), _next(NULL) { }
BufferNode* next() const { return _next; }
void set_next(BufferNode* n) { _next = n; }
size_t index() const { return _index; }
void set_index(size_t i) { _index = i; }
// Align the size of the structure to the size of the pointer
static size_t aligned_size() {
static const size_t alignment = round_to(sizeof(BufferNode), sizeof(void*));
return alignment;
}
// BufferNode is allocated before the buffer.
// The chunk of memory that holds both of them is a block.
// Produce a new BufferNode given a buffer.
static BufferNode* new_from_buffer(void** buf) {
return new (make_block_from_buffer(buf)) BufferNode;
}
// The following are the required conversion routines:
static BufferNode* make_node_from_buffer(void** buf) {
return (BufferNode*)make_block_from_buffer(buf);
}
static void** make_buffer_from_node(BufferNode *node) {
return make_buffer_from_block(node);
}
static void* make_block_from_node(BufferNode *node) {
return (void*)node;
}
static void** make_buffer_from_block(void* p) {
return (void**)((char*)p + aligned_size());
}
static void* make_block_from_buffer(void** p) {
return (void*)((char*)p - aligned_size());
}
};
// A PtrQueueSet represents resources common to a set of pointer queues. // A PtrQueueSet represents resources common to a set of pointer queues.
// In particular, the individual queues allocate buffers from this shared // In particular, the individual queues allocate buffers from this shared
// set, and return completed buffers to the set. // set, and return completed buffers to the set.
// All these variables are are protected by the TLOQ_CBL_mon. XXX ??? // All these variables are are protected by the TLOQ_CBL_mon. XXX ???
class PtrQueueSet VALUE_OBJ_CLASS_SPEC { class PtrQueueSet VALUE_OBJ_CLASS_SPEC {
protected: protected:
class CompletedBufferNode: public CHeapObj {
public:
void** buf;
size_t index;
CompletedBufferNode* next;
CompletedBufferNode() : buf(NULL),
index(0), next(NULL){ }
};
Monitor* _cbl_mon; // Protects the fields below. Monitor* _cbl_mon; // Protects the fields below.
CompletedBufferNode* _completed_buffers_head; BufferNode* _completed_buffers_head;
CompletedBufferNode* _completed_buffers_tail; BufferNode* _completed_buffers_tail;
size_t _n_completed_buffers; int _n_completed_buffers;
size_t _process_completed_threshold; int _process_completed_threshold;
volatile bool _process_completed; volatile bool _process_completed;
// This (and the interpretation of the first element as a "next" // This (and the interpretation of the first element as a "next"
// pointer) are protected by the TLOQ_FL_lock. // pointer) are protected by the TLOQ_FL_lock.
Mutex* _fl_lock; Mutex* _fl_lock;
void** _buf_free_list; BufferNode* _buf_free_list;
size_t _buf_free_list_sz; size_t _buf_free_list_sz;
// Queue set can share a freelist. The _fl_owner variable // Queue set can share a freelist. The _fl_owner variable
// specifies the owner. It is set to "this" by default. // specifies the owner. It is set to "this" by default.
...@@ -170,6 +203,7 @@ protected: ...@@ -170,6 +203,7 @@ protected:
// Maximum number of elements allowed on completed queue: after that, // Maximum number of elements allowed on completed queue: after that,
// enqueuer does the work itself. Zero indicates no maximum. // enqueuer does the work itself. Zero indicates no maximum.
int _max_completed_queue; int _max_completed_queue;
int _completed_queue_padding;
int completed_buffers_list_length(); int completed_buffers_list_length();
void assert_completed_buffer_list_len_correct_locked(); void assert_completed_buffer_list_len_correct_locked();
...@@ -191,9 +225,12 @@ public: ...@@ -191,9 +225,12 @@ public:
// Because of init-order concerns, we can't pass these as constructor // Because of init-order concerns, we can't pass these as constructor
// arguments. // arguments.
void initialize(Monitor* cbl_mon, Mutex* fl_lock, void initialize(Monitor* cbl_mon, Mutex* fl_lock,
int max_completed_queue = 0, int process_completed_threshold,
int max_completed_queue,
PtrQueueSet *fl_owner = NULL) { PtrQueueSet *fl_owner = NULL) {
_max_completed_queue = max_completed_queue; _max_completed_queue = max_completed_queue;
_process_completed_threshold = process_completed_threshold;
_completed_queue_padding = 0;
assert(cbl_mon != NULL && fl_lock != NULL, "Init order issue?"); assert(cbl_mon != NULL && fl_lock != NULL, "Init order issue?");
_cbl_mon = cbl_mon; _cbl_mon = cbl_mon;
_fl_lock = fl_lock; _fl_lock = fl_lock;
...@@ -208,14 +245,17 @@ public: ...@@ -208,14 +245,17 @@ public:
void deallocate_buffer(void** buf); void deallocate_buffer(void** buf);
// Declares that "buf" is a complete buffer. // Declares that "buf" is a complete buffer.
void enqueue_complete_buffer(void** buf, size_t index = 0, void enqueue_complete_buffer(void** buf, size_t index = 0);
bool ignore_max_completed = false);
// To be invoked by the mutator.
bool process_or_enqueue_complete_buffer(void** buf);
bool completed_buffers_exist_dirty() { bool completed_buffers_exist_dirty() {
return _n_completed_buffers > 0; return _n_completed_buffers > 0;
} }
bool process_completed_buffers() { return _process_completed; } bool process_completed_buffers() { return _process_completed; }
void set_process_completed(bool x) { _process_completed = x; }
bool active() { return _all_active; } bool active() { return _all_active; }
...@@ -226,15 +266,24 @@ public: ...@@ -226,15 +266,24 @@ public:
// Get the buffer size. // Get the buffer size.
size_t buffer_size() { return _sz; } size_t buffer_size() { return _sz; }
// Set the number of completed buffers that triggers log processing. // Get/Set the number of completed buffers that triggers log processing.
void set_process_completed_threshold(size_t sz); void set_process_completed_threshold(int sz) { _process_completed_threshold = sz; }
int process_completed_threshold() const { return _process_completed_threshold; }
// Must only be called at a safe point. Indicates that the buffer free // Must only be called at a safe point. Indicates that the buffer free
// list size may be reduced, if that is deemed desirable. // list size may be reduced, if that is deemed desirable.
void reduce_free_list(); void reduce_free_list();
size_t completed_buffers_num() { return _n_completed_buffers; } int completed_buffers_num() { return _n_completed_buffers; }
void merge_bufferlists(PtrQueueSet* src); void merge_bufferlists(PtrQueueSet* src);
void merge_freelists(PtrQueueSet* src);
void set_max_completed_queue(int m) { _max_completed_queue = m; }
int max_completed_queue() { return _max_completed_queue; }
void set_completed_queue_padding(int padding) { _completed_queue_padding = padding; }
int completed_queue_padding() { return _completed_queue_padding; }
// Notify the consumer if the number of buffers crossed the threshold
void notify_if_necessary();
}; };
...@@ -67,9 +67,9 @@ SATBMarkQueueSet::SATBMarkQueueSet() : ...@@ -67,9 +67,9 @@ SATBMarkQueueSet::SATBMarkQueueSet() :
{} {}
void SATBMarkQueueSet::initialize(Monitor* cbl_mon, Mutex* fl_lock, void SATBMarkQueueSet::initialize(Monitor* cbl_mon, Mutex* fl_lock,
int max_completed_queue, int process_completed_threshold,
Mutex* lock) { Mutex* lock) {
PtrQueueSet::initialize(cbl_mon, fl_lock, max_completed_queue); PtrQueueSet::initialize(cbl_mon, fl_lock, process_completed_threshold, -1);
_shared_satb_queue.set_lock(lock); _shared_satb_queue.set_lock(lock);
if (ParallelGCThreads > 0) { if (ParallelGCThreads > 0) {
_par_closures = NEW_C_HEAP_ARRAY(ObjectClosure*, ParallelGCThreads); _par_closures = NEW_C_HEAP_ARRAY(ObjectClosure*, ParallelGCThreads);
...@@ -122,12 +122,12 @@ void SATBMarkQueueSet::par_iterate_closure_all_threads(int worker) { ...@@ -122,12 +122,12 @@ void SATBMarkQueueSet::par_iterate_closure_all_threads(int worker) {
bool SATBMarkQueueSet::apply_closure_to_completed_buffer_work(bool par, bool SATBMarkQueueSet::apply_closure_to_completed_buffer_work(bool par,
int worker) { int worker) {
CompletedBufferNode* nd = NULL; BufferNode* nd = NULL;
{ {
MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag);
if (_completed_buffers_head != NULL) { if (_completed_buffers_head != NULL) {
nd = _completed_buffers_head; nd = _completed_buffers_head;
_completed_buffers_head = nd->next; _completed_buffers_head = nd->next();
if (_completed_buffers_head == NULL) _completed_buffers_tail = NULL; if (_completed_buffers_head == NULL) _completed_buffers_tail = NULL;
_n_completed_buffers--; _n_completed_buffers--;
if (_n_completed_buffers == 0) _process_completed = false; if (_n_completed_buffers == 0) _process_completed = false;
...@@ -135,9 +135,9 @@ bool SATBMarkQueueSet::apply_closure_to_completed_buffer_work(bool par, ...@@ -135,9 +135,9 @@ bool SATBMarkQueueSet::apply_closure_to_completed_buffer_work(bool par,
} }
ObjectClosure* cl = (par ? _par_closures[worker] : _closure); ObjectClosure* cl = (par ? _par_closures[worker] : _closure);
if (nd != NULL) { if (nd != NULL) {
ObjPtrQueue::apply_closure_to_buffer(cl, nd->buf, 0, _sz); void **buf = BufferNode::make_buffer_from_node(nd);
deallocate_buffer(nd->buf); ObjPtrQueue::apply_closure_to_buffer(cl, buf, 0, _sz);
delete nd; deallocate_buffer(buf);
return true; return true;
} else { } else {
return false; return false;
...@@ -145,13 +145,13 @@ bool SATBMarkQueueSet::apply_closure_to_completed_buffer_work(bool par, ...@@ -145,13 +145,13 @@ bool SATBMarkQueueSet::apply_closure_to_completed_buffer_work(bool par,
} }
void SATBMarkQueueSet::abandon_partial_marking() { void SATBMarkQueueSet::abandon_partial_marking() {
CompletedBufferNode* buffers_to_delete = NULL; BufferNode* buffers_to_delete = NULL;
{ {
MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag);
while (_completed_buffers_head != NULL) { while (_completed_buffers_head != NULL) {
CompletedBufferNode* nd = _completed_buffers_head; BufferNode* nd = _completed_buffers_head;
_completed_buffers_head = nd->next; _completed_buffers_head = nd->next();
nd->next = buffers_to_delete; nd->set_next(buffers_to_delete);
buffers_to_delete = nd; buffers_to_delete = nd;
} }
_completed_buffers_tail = NULL; _completed_buffers_tail = NULL;
...@@ -159,10 +159,9 @@ void SATBMarkQueueSet::abandon_partial_marking() { ...@@ -159,10 +159,9 @@ void SATBMarkQueueSet::abandon_partial_marking() {
DEBUG_ONLY(assert_completed_buffer_list_len_correct_locked()); DEBUG_ONLY(assert_completed_buffer_list_len_correct_locked());
} }
while (buffers_to_delete != NULL) { while (buffers_to_delete != NULL) {
CompletedBufferNode* nd = buffers_to_delete; BufferNode* nd = buffers_to_delete;
buffers_to_delete = nd->next; buffers_to_delete = nd->next();
deallocate_buffer(nd->buf); deallocate_buffer(BufferNode::make_buffer_from_node(nd));
delete nd;
} }
assert(SafepointSynchronize::is_at_safepoint(), "Must be at safepoint."); assert(SafepointSynchronize::is_at_safepoint(), "Must be at safepoint.");
// So we can safely manipulate these queues. // So we can safely manipulate these queues.
......
...@@ -60,8 +60,8 @@ public: ...@@ -60,8 +60,8 @@ public:
SATBMarkQueueSet(); SATBMarkQueueSet();
void initialize(Monitor* cbl_mon, Mutex* fl_lock, void initialize(Monitor* cbl_mon, Mutex* fl_lock,
int max_completed_queue = 0, int process_completed_threshold,
Mutex* lock = NULL); Mutex* lock);
static void handle_zero_index_for_thread(JavaThread* t); static void handle_zero_index_for_thread(JavaThread* t);
......
...@@ -109,7 +109,6 @@ dirtyCardQueue.cpp atomic.hpp ...@@ -109,7 +109,6 @@ dirtyCardQueue.cpp atomic.hpp
dirtyCardQueue.cpp dirtyCardQueue.hpp dirtyCardQueue.cpp dirtyCardQueue.hpp
dirtyCardQueue.cpp heapRegionRemSet.hpp dirtyCardQueue.cpp heapRegionRemSet.hpp
dirtyCardQueue.cpp mutexLocker.hpp dirtyCardQueue.cpp mutexLocker.hpp
dirtyCardQueue.cpp ptrQueue.inline.hpp
dirtyCardQueue.cpp safepoint.hpp dirtyCardQueue.cpp safepoint.hpp
dirtyCardQueue.cpp thread.hpp dirtyCardQueue.cpp thread.hpp
dirtyCardQueue.cpp thread_<os_family>.inline.hpp dirtyCardQueue.cpp thread_<os_family>.inline.hpp
...@@ -319,7 +318,6 @@ ptrQueue.cpp allocation.inline.hpp ...@@ -319,7 +318,6 @@ ptrQueue.cpp allocation.inline.hpp
ptrQueue.cpp mutex.hpp ptrQueue.cpp mutex.hpp
ptrQueue.cpp mutexLocker.hpp ptrQueue.cpp mutexLocker.hpp
ptrQueue.cpp ptrQueue.hpp ptrQueue.cpp ptrQueue.hpp
ptrQueue.cpp ptrQueue.inline.hpp
ptrQueue.cpp thread_<os_family>.inline.hpp ptrQueue.cpp thread_<os_family>.inline.hpp
ptrQueue.hpp allocation.hpp ptrQueue.hpp allocation.hpp
...@@ -329,7 +327,6 @@ ptrQueue.inline.hpp ptrQueue.hpp ...@@ -329,7 +327,6 @@ ptrQueue.inline.hpp ptrQueue.hpp
satbQueue.cpp allocation.inline.hpp satbQueue.cpp allocation.inline.hpp
satbQueue.cpp mutexLocker.hpp satbQueue.cpp mutexLocker.hpp
satbQueue.cpp ptrQueue.inline.hpp
satbQueue.cpp satbQueue.hpp satbQueue.cpp satbQueue.hpp
satbQueue.cpp sharedHeap.hpp satbQueue.cpp sharedHeap.hpp
satbQueue.cpp thread.hpp satbQueue.cpp thread.hpp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册