提交 27b67e35 编写于 作者: V Vlad Brezae 提交者: GitHub

Merge pull request #5084 from BrzVlad/feature-worker-context

[sgen] Implement sgen worker context
......@@ -2973,9 +2973,7 @@ mono_gc_base_init (void)
void
mono_gc_base_cleanup (void)
{
sgen_thread_pool_shutdown (major_collector.get_sweep_pool ());
sgen_workers_shutdown ();
sgen_thread_pool_shutdown ();
// We should have consumed any outstanding moves.
g_assert (sgen_pointer_queue_is_empty (&moved_objects_queue));
......
......@@ -230,8 +230,7 @@ sgen_is_thread_in_current_stw (SgenThreadInfo *info, int *reason)
We can't suspend the workers that will do all the heavy lifting.
FIXME Use some state bit in SgenThreadInfo for this.
*/
if (sgen_thread_pool_is_thread_pool_thread (major_collector.get_sweep_pool (), mono_thread_info_get_tid (info)) ||
sgen_workers_is_worker_thread (mono_thread_info_get_tid (info))) {
if (sgen_thread_pool_is_thread_pool_thread (mono_thread_info_get_tid (info))) {
if (reason)
*reason = 4;
return FALSE;
......
......@@ -454,7 +454,7 @@ sgen_workers_get_job_gray_queue (WorkerData *worker_data, SgenGrayQueue *default
static void
gray_queue_redirect (SgenGrayQueue *queue)
{
sgen_workers_take_from_queue (queue);
sgen_workers_take_from_queue (current_collection_generation, queue);
}
void
......@@ -1360,7 +1360,7 @@ typedef struct {
typedef struct {
ScanJob scan_job;
int job_index;
int job_index, job_split_count;
} ParallelScanJob;
static ScanCopyContext
......@@ -1375,7 +1375,7 @@ scan_copy_context_for_scan_job (void *worker_data_untyped, ScanJob *job)
* object ops changes, like forced concurrent finish.
*/
SGEN_ASSERT (0, sgen_workers_is_worker_thread (mono_native_thread_id_get ()), "We need a context for the scan job");
job->ops = sgen_workers_get_idle_func_object_ops ();
job->ops = sgen_workers_get_idle_func_object_ops (worker_data);
}
return CONTEXT_FROM_OBJECT_OPERATIONS (job->ops, sgen_workers_get_job_gray_queue (worker_data, job->gc_thread_gray_queue));
......@@ -1444,7 +1444,7 @@ job_scan_major_card_table (void *worker_data_untyped, SgenThreadPoolJob *job)
ScanCopyContext ctx = scan_copy_context_for_scan_job (worker_data_untyped, (ScanJob*)job_data);
SGEN_TV_GETTIME (atv);
major_collector.scan_card_table (CARDTABLE_SCAN_GLOBAL, ctx, job_data->job_index, sgen_workers_get_job_split_count ());
major_collector.scan_card_table (CARDTABLE_SCAN_GLOBAL, ctx, job_data->job_index, job_data->job_split_count);
SGEN_TV_GETTIME (btv);
time_minor_scan_major_blocks += SGEN_TV_ELAPSED (atv, btv);
}
......@@ -1458,7 +1458,7 @@ job_scan_los_card_table (void *worker_data_untyped, SgenThreadPoolJob *job)
ScanCopyContext ctx = scan_copy_context_for_scan_job (worker_data_untyped, (ScanJob*)job_data);
SGEN_TV_GETTIME (atv);
sgen_los_scan_card_table (CARDTABLE_SCAN_GLOBAL, ctx, job_data->job_index, sgen_workers_get_job_split_count ());
sgen_los_scan_card_table (CARDTABLE_SCAN_GLOBAL, ctx, job_data->job_index, job_data->job_split_count);
SGEN_TV_GETTIME (btv);
time_minor_scan_los += SGEN_TV_ELAPSED (atv, btv);
}
......@@ -1470,7 +1470,7 @@ job_scan_major_mod_union_card_table (void *worker_data_untyped, SgenThreadPoolJo
ScanCopyContext ctx = scan_copy_context_for_scan_job (worker_data_untyped, (ScanJob*)job_data);
g_assert (concurrent_collection_in_progress);
major_collector.scan_card_table (CARDTABLE_SCAN_MOD_UNION, ctx, job_data->job_index, sgen_workers_get_job_split_count ());
major_collector.scan_card_table (CARDTABLE_SCAN_MOD_UNION, ctx, job_data->job_index, job_data->job_split_count);
}
static void
......@@ -1480,7 +1480,7 @@ job_scan_los_mod_union_card_table (void *worker_data_untyped, SgenThreadPoolJob
ScanCopyContext ctx = scan_copy_context_for_scan_job (worker_data_untyped, (ScanJob*)job_data);
g_assert (concurrent_collection_in_progress);
sgen_los_scan_card_table (CARDTABLE_SCAN_MOD_UNION, ctx, job_data->job_index, sgen_workers_get_job_split_count ());
sgen_los_scan_card_table (CARDTABLE_SCAN_MOD_UNION, ctx, job_data->job_index, job_data->job_split_count);
}
static void
......@@ -1491,7 +1491,7 @@ job_major_mod_union_preclean (void *worker_data_untyped, SgenThreadPoolJob *job)
g_assert (concurrent_collection_in_progress);
major_collector.scan_card_table (CARDTABLE_SCAN_MOD_UNION_PRECLEAN, ctx, job_data->job_index, sgen_workers_get_job_split_count ());
major_collector.scan_card_table (CARDTABLE_SCAN_MOD_UNION_PRECLEAN, ctx, job_data->job_index, job_data->job_split_count);
}
static void
......@@ -1502,7 +1502,7 @@ job_los_mod_union_preclean (void *worker_data_untyped, SgenThreadPoolJob *job)
g_assert (concurrent_collection_in_progress);
sgen_los_scan_card_table (CARDTABLE_SCAN_MOD_UNION_PRECLEAN, ctx, job_data->job_index, sgen_workers_get_job_split_count ());
sgen_los_scan_card_table (CARDTABLE_SCAN_MOD_UNION_PRECLEAN, ctx, job_data->job_index, job_data->job_split_count);
}
static void
......@@ -1521,46 +1521,46 @@ workers_finish_callback (void)
{
ParallelScanJob *psj;
ScanJob *sj;
int split_count = sgen_workers_get_job_split_count ();
int split_count = sgen_workers_get_job_split_count (GENERATION_OLD);
int i;
/* Mod union preclean jobs */
for (i = 0; i < split_count; i++) {
psj = (ParallelScanJob*)sgen_thread_pool_job_alloc ("preclean major mod union cardtable", job_major_mod_union_preclean, sizeof (ParallelScanJob));
psj->scan_job.gc_thread_gray_queue = NULL;
psj->job_index = i;
sgen_workers_enqueue_job (&psj->scan_job.job, TRUE);
psj->job_split_count = split_count;
sgen_workers_enqueue_job (GENERATION_OLD, &psj->scan_job.job, TRUE);
}
for (i = 0; i < split_count; i++) {
psj = (ParallelScanJob*)sgen_thread_pool_job_alloc ("preclean los mod union cardtable", job_los_mod_union_preclean, sizeof (ParallelScanJob));
psj->scan_job.gc_thread_gray_queue = NULL;
psj->job_index = i;
sgen_workers_enqueue_job (&psj->scan_job.job, TRUE);
psj->job_split_count = split_count;
sgen_workers_enqueue_job (GENERATION_OLD, &psj->scan_job.job, TRUE);
}
sj = (ScanJob*)sgen_thread_pool_job_alloc ("scan last pinned", job_scan_last_pinned, sizeof (ScanJob));
sj->gc_thread_gray_queue = NULL;
sgen_workers_enqueue_job (&sj->job, TRUE);
sgen_workers_enqueue_job (GENERATION_OLD, &sj->job, TRUE);
}
static void
init_gray_queue (SgenGrayQueue *gc_thread_gray_queue, gboolean use_workers)
init_gray_queue (SgenGrayQueue *gc_thread_gray_queue)
{
if (use_workers)
sgen_workers_init_distribute_gray_queue ();
sgen_gray_object_queue_init (gc_thread_gray_queue, NULL, TRUE);
}
static void
enqueue_scan_remembered_set_jobs (SgenGrayQueue *gc_thread_gray_queue, SgenObjectOperations *ops, gboolean enqueue)
{
int i, split_count = sgen_workers_get_job_split_count ();
int i, split_count = sgen_workers_get_job_split_count (GENERATION_NURSERY);
ScanJob *sj;
sj = (ScanJob*)sgen_thread_pool_job_alloc ("scan wbroots", job_scan_wbroots, sizeof (ScanJob));
sj->ops = ops;
sj->gc_thread_gray_queue = gc_thread_gray_queue;
sgen_workers_enqueue_job (&sj->job, enqueue);
sgen_workers_enqueue_job (GENERATION_NURSERY, &sj->job, enqueue);
for (i = 0; i < split_count; i++) {
ParallelScanJob *psj;
......@@ -1569,13 +1569,15 @@ enqueue_scan_remembered_set_jobs (SgenGrayQueue *gc_thread_gray_queue, SgenObjec
psj->scan_job.ops = ops;
psj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
psj->job_index = i;
sgen_workers_enqueue_job (&psj->scan_job.job, enqueue);
psj->job_split_count = split_count;
sgen_workers_enqueue_job (GENERATION_NURSERY, &psj->scan_job.job, enqueue);
psj = (ParallelScanJob*)sgen_thread_pool_job_alloc ("scan LOS remsets", job_scan_los_card_table, sizeof (ParallelScanJob));
psj->scan_job.ops = ops;
psj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
psj->job_index = i;
sgen_workers_enqueue_job (&psj->scan_job.job, enqueue);
psj->job_split_count = split_count;
sgen_workers_enqueue_job (GENERATION_NURSERY, &psj->scan_job.job, enqueue);
}
}
......@@ -1594,7 +1596,7 @@ enqueue_scan_from_roots_jobs (SgenGrayQueue *gc_thread_gray_queue, char *heap_st
scrrj->heap_start = heap_start;
scrrj->heap_end = heap_end;
scrrj->root_type = ROOT_TYPE_NORMAL;
sgen_workers_enqueue_job (&scrrj->scan_job.job, enqueue);
sgen_workers_enqueue_job (current_collection_generation, &scrrj->scan_job.job, enqueue);
if (current_collection_generation == GENERATION_OLD) {
/* During minors we scan the cardtable for these roots instead */
......@@ -1604,7 +1606,7 @@ enqueue_scan_from_roots_jobs (SgenGrayQueue *gc_thread_gray_queue, char *heap_st
scrrj->heap_start = heap_start;
scrrj->heap_end = heap_end;
scrrj->root_type = ROOT_TYPE_WBARRIER;
sgen_workers_enqueue_job (&scrrj->scan_job.job, enqueue);
sgen_workers_enqueue_job (current_collection_generation, &scrrj->scan_job.job, enqueue);
}
/* Threads */
......@@ -1614,7 +1616,7 @@ enqueue_scan_from_roots_jobs (SgenGrayQueue *gc_thread_gray_queue, char *heap_st
stdj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
stdj->heap_start = heap_start;
stdj->heap_end = heap_end;
sgen_workers_enqueue_job (&stdj->scan_job.job, enqueue);
sgen_workers_enqueue_job (current_collection_generation, &stdj->scan_job.job, enqueue);
/* Scan the list of objects ready for finalization. */
......@@ -1622,13 +1624,13 @@ enqueue_scan_from_roots_jobs (SgenGrayQueue *gc_thread_gray_queue, char *heap_st
sfej->scan_job.ops = ops;
sfej->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
sfej->queue = &fin_ready_queue;
sgen_workers_enqueue_job (&sfej->scan_job.job, enqueue);
sgen_workers_enqueue_job (current_collection_generation, &sfej->scan_job.job, enqueue);
sfej = (ScanFinalizerEntriesJob*)sgen_thread_pool_job_alloc ("scan critical finalizer entries", job_scan_finalizer_entries, sizeof (ScanFinalizerEntriesJob));
sfej->scan_job.ops = ops;
sfej->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
sfej->queue = &critical_fin_queue;
sgen_workers_enqueue_job (&sfej->scan_job.job, enqueue);
sgen_workers_enqueue_job (current_collection_generation, &sfej->scan_job.job, enqueue);
}
/*
......@@ -1657,15 +1659,14 @@ collect_nursery (const char *reason, gboolean is_overflow, SgenGrayQueue *unpin_
binary_protocol_collection_begin (gc_stats.minor_gc_count, GENERATION_NURSERY);
if (sgen_concurrent_collection_in_progress ()) {
/* FIXME Support parallel nursery collections with concurrent major */
object_ops_nopar = &sgen_minor_collector.serial_ops_with_concurrent_major;
} else {
object_ops_nopar = &sgen_minor_collector.serial_ops;
if (sgen_minor_collector.is_parallel && sgen_nursery_size >= SGEN_PARALLEL_MINOR_MIN_NURSERY_SIZE) {
object_ops_par = &sgen_minor_collector.parallel_ops;
is_parallel = TRUE;
}
object_ops_nopar = sgen_concurrent_collection_in_progress ()
? &sgen_minor_collector.serial_ops_with_concurrent_major
: &sgen_minor_collector.serial_ops;
if (sgen_minor_collector.is_parallel && sgen_nursery_size >= SGEN_PARALLEL_MINOR_MIN_NURSERY_SIZE) {
object_ops_par = sgen_concurrent_collection_in_progress ()
? &sgen_minor_collector.parallel_ops_with_concurrent_major
: &sgen_minor_collector.parallel_ops;
is_parallel = TRUE;
}
if (do_verify_nursery || do_dump_nursery_content)
......@@ -1696,7 +1697,7 @@ collect_nursery (const char *reason, gboolean is_overflow, SgenGrayQueue *unpin_
sgen_memgov_minor_collection_start ();
init_gray_queue (&gc_thread_gray_queue, is_parallel);
init_gray_queue (&gc_thread_gray_queue);
ctx = CONTEXT_FROM_OBJECT_OPERATIONS (object_ops_nopar, &gc_thread_gray_queue);
gc_stats.minor_gc_count ++;
......@@ -1705,6 +1706,8 @@ collect_nursery (const char *reason, gboolean is_overflow, SgenGrayQueue *unpin_
/* pin from pinned handles */
sgen_init_pinning ();
if (concurrent_collection_in_progress)
sgen_init_pinning_for_conc ();
sgen_client_binary_protocol_mark_start (GENERATION_NURSERY);
pin_from_roots (nursery_section->data, nursery_section->end_data, ctx);
/* pin cemented objects */
......@@ -1715,6 +1718,8 @@ collect_nursery (const char *reason, gboolean is_overflow, SgenGrayQueue *unpin_
pin_objects_in_nursery (FALSE, ctx);
sgen_pinning_trim_queue_to_section (nursery_section);
if (concurrent_collection_in_progress)
sgen_finish_pinning_for_conc ();
if (remset_consistency_checks)
sgen_check_remset_consistency ();
......@@ -1750,8 +1755,8 @@ collect_nursery (const char *reason, gboolean is_overflow, SgenGrayQueue *unpin_
if (is_parallel) {
gray_queue_redirect (&gc_thread_gray_queue);
sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, NULL);
sgen_workers_join ();
sgen_workers_start_all_workers (GENERATION_NURSERY, object_ops_nopar, object_ops_par, NULL);
sgen_workers_join (GENERATION_NURSERY);
}
TV_GETTIME (btv);
......@@ -1904,6 +1909,8 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
TV_GETTIME (atv);
sgen_init_pinning ();
if (mode == COPY_OR_MARK_FROM_ROOTS_START_CONCURRENT)
sgen_init_pinning_for_conc ();
SGEN_LOG (6, "Collecting pinned addresses");
pin_from_roots ((void*)lowest_heap_address, (void*)highest_heap_address, ctx);
if (mode == COPY_OR_MARK_FROM_ROOTS_FINISH_CONCURRENT) {
......@@ -1974,20 +1981,23 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
SGEN_LOG (2, "Finding pinned pointers: %zd in %lld usecs", sgen_get_pinned_count (), (long long)TV_ELAPSED (atv, btv));
SGEN_LOG (4, "Start scan with %zd pinned objects", sgen_get_pinned_count ());
if (mode == COPY_OR_MARK_FROM_ROOTS_START_CONCURRENT)
sgen_finish_pinning_for_conc ();
major_collector.init_to_space ();
SGEN_ASSERT (0, sgen_workers_all_done (), "Why are the workers not done when we start or finish a major collection?");
if (mode == COPY_OR_MARK_FROM_ROOTS_FINISH_CONCURRENT) {
if (object_ops_par != NULL)
sgen_workers_set_num_active_workers (0);
if (sgen_workers_have_idle_work ()) {
sgen_workers_set_num_active_workers (GENERATION_OLD, 0);
if (sgen_workers_have_idle_work (GENERATION_OLD)) {
/*
* We force the finish of the worker with the new object ops context
* which can also do copying. We need to have finished pinning.
*/
sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, NULL);
sgen_workers_start_all_workers (GENERATION_OLD, object_ops_nopar, object_ops_par, NULL);
sgen_workers_join ();
sgen_workers_join (GENERATION_OLD);
}
}
......@@ -2013,17 +2023,17 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
* the roots.
*/
if (mode == COPY_OR_MARK_FROM_ROOTS_START_CONCURRENT) {
sgen_workers_set_num_active_workers (1);
sgen_workers_set_num_active_workers (GENERATION_OLD, 1);
gray_queue_redirect (gc_thread_gray_queue);
if (precleaning_enabled) {
sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, workers_finish_callback);
sgen_workers_start_all_workers (GENERATION_OLD, object_ops_nopar, object_ops_par, workers_finish_callback);
} else {
sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, NULL);
sgen_workers_start_all_workers (GENERATION_OLD, object_ops_nopar, object_ops_par, NULL);
}
}
if (mode == COPY_OR_MARK_FROM_ROOTS_FINISH_CONCURRENT) {
int i, split_count = sgen_workers_get_job_split_count ();
int i, split_count = sgen_workers_get_job_split_count (GENERATION_OLD);
gboolean parallel = object_ops_par != NULL;
/* If we're not parallel we finish the collection on the gc thread */
......@@ -2038,13 +2048,15 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
psj->scan_job.ops = parallel ? NULL : object_ops_nopar;
psj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
psj->job_index = i;
sgen_workers_enqueue_job (&psj->scan_job.job, parallel);
psj->job_split_count = split_count;
sgen_workers_enqueue_job (GENERATION_OLD, &psj->scan_job.job, parallel);
psj = (ParallelScanJob*)sgen_thread_pool_job_alloc ("scan LOS mod union cardtable", job_scan_los_mod_union_card_table, sizeof (ParallelScanJob));
psj->scan_job.ops = parallel ? NULL : object_ops_nopar;
psj->scan_job.gc_thread_gray_queue = gc_thread_gray_queue;
psj->job_index = i;
sgen_workers_enqueue_job (&psj->scan_job.job, parallel);
psj->job_split_count = split_count;
sgen_workers_enqueue_job (GENERATION_OLD, &psj->scan_job.job, parallel);
}
if (parallel) {
......@@ -2056,8 +2068,8 @@ major_copy_or_mark_from_roots (SgenGrayQueue *gc_thread_gray_queue, size_t *old_
* stack that contained roots and pinned objects and also scan the mod union card
* table.
*/
sgen_workers_start_all_workers (object_ops_nopar, object_ops_par, NULL);
sgen_workers_join ();
sgen_workers_start_all_workers (GENERATION_OLD, object_ops_nopar, object_ops_par, NULL);
sgen_workers_join (GENERATION_OLD);
}
}
......@@ -2082,7 +2094,7 @@ major_start_collection (SgenGrayQueue *gc_thread_gray_queue, const char *reason,
current_collection_generation = GENERATION_OLD;
sgen_workers_assert_gray_queue_is_empty ();
sgen_workers_assert_gray_queue_is_empty (GENERATION_OLD);
if (!concurrent)
sgen_cement_reset ();
......@@ -2145,7 +2157,7 @@ major_finish_collection (SgenGrayQueue *gc_thread_gray_queue, const char *reason
object_ops_nopar = &major_collector.major_ops_serial;
}
sgen_workers_assert_gray_queue_is_empty ();
sgen_workers_assert_gray_queue_is_empty (GENERATION_OLD);
finish_gray_stack (GENERATION_OLD, CONTEXT_FROM_OBJECT_OPERATIONS (object_ops_nopar, gc_thread_gray_queue));
TV_GETTIME (atv);
......@@ -2237,7 +2249,7 @@ major_finish_collection (SgenGrayQueue *gc_thread_gray_queue, const char *reason
memset (&counts, 0, sizeof (ScannedObjectCounts));
major_collector.finish_major_collection (&counts);
sgen_workers_assert_gray_queue_is_empty ();
sgen_workers_assert_gray_queue_is_empty (GENERATION_OLD);
SGEN_ASSERT (0, sgen_workers_all_done (), "Can't have workers working after major collection has finished");
if (concurrent_collection_in_progress)
......@@ -2271,7 +2283,7 @@ major_do_collection (const char *reason, gboolean is_overflow, gboolean forced)
/* world must be stopped already */
TV_GETTIME (time_start);
init_gray_queue (&gc_thread_gray_queue, FALSE);
init_gray_queue (&gc_thread_gray_queue);
major_start_collection (&gc_thread_gray_queue, reason, FALSE, &old_next_pin_slot);
major_finish_collection (&gc_thread_gray_queue, reason, is_overflow, old_next_pin_slot, forced);
sgen_gray_object_queue_dispose (&gc_thread_gray_queue);
......@@ -2305,7 +2317,7 @@ major_start_concurrent_collection (const char *reason)
binary_protocol_concurrent_start ();
init_gray_queue (&gc_thread_gray_queue, TRUE);
init_gray_queue (&gc_thread_gray_queue);
// FIXME: store reason and pass it when finishing
major_start_collection (&gc_thread_gray_queue, reason, TRUE, NULL);
sgen_gray_object_queue_dispose (&gc_thread_gray_queue);
......@@ -2360,7 +2372,7 @@ major_finish_concurrent_collection (gboolean forced)
* The workers will be resumed with a finishing pause context to avoid
* additional cardtable and object scanning.
*/
sgen_workers_stop_all_workers ();
sgen_workers_stop_all_workers (GENERATION_OLD);
SGEN_TV_GETTIME (time_major_conc_collection_end);
gc_stats.major_gc_time_concurrent += SGEN_TV_ELAPSED (time_major_conc_collection_start, time_major_conc_collection_end);
......@@ -2373,7 +2385,7 @@ major_finish_concurrent_collection (gboolean forced)
current_collection_generation = GENERATION_OLD;
sgen_cement_reset ();
init_gray_queue (&gc_thread_gray_queue, FALSE);
init_gray_queue (&gc_thread_gray_queue);
major_finish_collection (&gc_thread_gray_queue, "finishing", FALSE, -1, forced);
sgen_gray_object_queue_dispose (&gc_thread_gray_queue);
......@@ -3594,19 +3606,7 @@ sgen_gc_init (void)
if (major_collector.post_param_init)
major_collector.post_param_init (&major_collector);
if (major_collector.is_concurrent || sgen_minor_collector.is_parallel) {
int num_workers = 1;
if (major_collector.is_parallel || sgen_minor_collector.is_parallel) {
num_workers = mono_cpu_count ();
if (num_workers <= 1) {
num_workers = 1;
major_collector.is_parallel = FALSE;
sgen_minor_collector.is_parallel = FALSE;
}
}
if (major_collector.is_concurrent || sgen_minor_collector.is_parallel)
sgen_workers_init (num_workers, (SgenWorkerCallback) major_collector.worker_init_cb);
}
sgen_thread_pool_start ();
sgen_memgov_init (max_heap, soft_limit, debug_print_allowance, allowance_ratio, save_target);
......
......@@ -552,6 +552,7 @@ typedef struct {
SgenObjectOperations serial_ops;
SgenObjectOperations serial_ops_with_concurrent_major;
SgenObjectOperations parallel_ops;
SgenObjectOperations parallel_ops_with_concurrent_major;
void (*prepare_to_space) (char *to_space_bitmap, size_t space_bitmap_size);
void (*clear_fragments) (void);
......@@ -678,9 +679,7 @@ struct _SgenMajorCollector {
guint8* (*get_cardtable_mod_union_for_reference) (char *object);
long long (*get_and_reset_num_major_objects_marked) (void);
void (*count_cards) (long long *num_total_cards, long long *num_marked_cards);
SgenThreadPool* (*get_sweep_pool) (void);
void (*worker_init_cb) (gpointer worker);
void (*init_block_free_lists) (gpointer *list_p);
};
extern SgenMajorCollector major_collector;
......
......@@ -32,6 +32,7 @@
#include "mono/sgen/sgen-thread-pool.h"
#include "mono/sgen/sgen-client.h"
#include "mono/utils/mono-memory-model.h"
#include "mono/utils/mono-proclib.h"
static int ms_block_size;
......@@ -193,8 +194,7 @@ static volatile int sweep_state = SWEEP_STATE_SWEPT;
static gboolean concurrent_mark;
static gboolean concurrent_sweep = TRUE;
SgenThreadPool sweep_pool_inst;
SgenThreadPool *sweep_pool;
int sweep_pool_context = -1;
#define BLOCK_IS_TAGGED_HAS_REFERENCES(bl) SGEN_POINTER_IS_TAGGED_1 ((bl))
#define BLOCK_TAG_HAS_REFERENCES(bl) SGEN_POINTER_TAG_1 ((bl))
......@@ -927,7 +927,7 @@ major_finish_sweep_checking (void)
wait:
job = sweep_job;
if (job)
sgen_thread_pool_job_wait (sweep_pool, job);
sgen_thread_pool_job_wait (sweep_pool_context, job);
SGEN_ASSERT (0, !sweep_job, "Why did the sweep job not null itself?");
SGEN_ASSERT (0, sweep_state == SWEEP_STATE_SWEPT, "How is the sweep job done but we're not swept?");
}
......@@ -1599,7 +1599,8 @@ sweep_start (void)
free_blocks [j] = NULL;
}
sgen_workers_foreach (sgen_worker_clear_free_block_lists);
sgen_workers_foreach (GENERATION_NURSERY, sgen_worker_clear_free_block_lists);
sgen_workers_foreach (GENERATION_OLD, sgen_worker_clear_free_block_lists);
}
static void sweep_finish (void);
......@@ -1815,7 +1816,7 @@ sweep_job_func (void *thread_data_untyped, SgenThreadPoolJob *job)
*/
if (concurrent_sweep && lazy_sweep) {
sweep_blocks_job = sgen_thread_pool_job_alloc ("sweep_blocks", sweep_blocks_job_func, sizeof (SgenThreadPoolJob));
sgen_thread_pool_job_enqueue (sweep_pool, sweep_blocks_job);
sgen_thread_pool_job_enqueue (sweep_pool_context, sweep_blocks_job);
}
sweep_finish ();
......@@ -1864,7 +1865,7 @@ major_sweep (void)
SGEN_ASSERT (0, !sweep_job, "We haven't finished the last sweep?");
if (concurrent_sweep) {
sweep_job = sgen_thread_pool_job_alloc ("sweep", sweep_job_func, sizeof (SgenThreadPoolJob));
sgen_thread_pool_job_enqueue (sweep_pool, sweep_job);
sgen_thread_pool_job_enqueue (sweep_pool_context, sweep_job);
} else {
sweep_job_func (NULL, NULL);
}
......@@ -2067,7 +2068,8 @@ major_start_major_collection (void)
}
/* We expect workers to have very few blocks on the freelist, just evacuate them */
sgen_workers_foreach (sgen_worker_clear_free_block_lists_evac);
sgen_workers_foreach (GENERATION_NURSERY, sgen_worker_clear_free_block_lists_evac);
sgen_workers_foreach (GENERATION_OLD, sgen_worker_clear_free_block_lists_evac);
if (lazy_sweep && concurrent_sweep) {
/*
......@@ -2077,7 +2079,7 @@ major_start_major_collection (void)
*/
SgenThreadPoolJob *job = sweep_blocks_job;
if (job)
sgen_thread_pool_job_wait (sweep_pool, job);
sgen_thread_pool_job_wait (sweep_pool_context, job);
}
if (lazy_sweep && !concurrent_sweep)
......@@ -2117,12 +2119,6 @@ major_finish_major_collection (ScannedObjectCounts *counts)
#endif
}
static SgenThreadPool*
major_get_sweep_pool (void)
{
return sweep_pool;
}
static int
compare_pointers (const void *va, const void *vb) {
char *a = *(char**)va, *b = *(char**)vb;
......@@ -2736,28 +2732,38 @@ post_param_init (SgenMajorCollector *collector)
collector->sweeps_lazily = lazy_sweep;
}
/* We are guaranteed to be called by the worker in question */
/*
* We are guaranteed to be called by the worker in question.
* This provides initialization for threads that plan to do
* parallel object allocation. We need to store these lists
* in additional data structures so we can traverse them
* at major/sweep start.
*/
static void
sgen_worker_init_callback (gpointer worker_untyped)
sgen_init_block_free_lists (gpointer *list_p)
{
int i;
WorkerData *worker = (WorkerData*) worker_untyped;
MSBlockInfo ***worker_free_blocks = (MSBlockInfo ***) sgen_alloc_internal_dynamic (sizeof (MSBlockInfo**) * MS_BLOCK_TYPE_MAX, INTERNAL_MEM_MS_TABLES, TRUE);
MSBlockInfo ***worker_free_blocks = (MSBlockInfo ***) mono_native_tls_get_value (worker_block_free_list_key);
/*
* For simplification, a worker thread uses the same free block lists,
* regardless of the context it is part of (major/minor).
*/
if (worker_free_blocks) {
*list_p = (gpointer)worker_free_blocks;
return;
}
worker_free_blocks = (MSBlockInfo ***) sgen_alloc_internal_dynamic (sizeof (MSBlockInfo**) * MS_BLOCK_TYPE_MAX, INTERNAL_MEM_MS_TABLES, TRUE);
for (i = 0; i < MS_BLOCK_TYPE_MAX; i++)
worker_free_blocks [i] = (MSBlockInfo **) sgen_alloc_internal_dynamic (sizeof (MSBlockInfo*) * num_block_obj_sizes, INTERNAL_MEM_MS_TABLES, TRUE);
worker->free_block_lists = worker_free_blocks;
*list_p = (gpointer)worker_free_blocks;
mono_native_tls_set_value (worker_block_free_list_key, worker_free_blocks);
}
static void
thread_pool_init_func (void *data_untyped)
{
sgen_client_thread_register_worker ();
}
static void
sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurrent, gboolean is_parallel)
{
......@@ -2770,6 +2776,9 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
sgen_register_fixed_internal_mem_type (INTERNAL_MEM_MS_BLOCK_INFO, SIZEOF_MS_BLOCK_INFO);
if (mono_cpu_count () <= 1)
is_parallel = FALSE;
num_block_obj_sizes = ms_calculate_block_obj_sizes (MS_BLOCK_OBJ_SIZE_FACTOR, NULL);
block_obj_sizes = (int *)sgen_alloc_internal_dynamic (sizeof (int) * num_block_obj_sizes, INTERNAL_MEM_MS_TABLES, TRUE);
ms_calculate_block_obj_sizes (MS_BLOCK_OBJ_SIZE_FACTOR, block_obj_sizes);
......@@ -2800,10 +2809,8 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
g_assert (MS_BLOCK_OBJ_SIZE_INDEX (i) == ms_find_block_obj_size_index (i));
/* We can do this because we always init the minor before the major */
if (is_parallel || sgen_get_minor_collector ()->is_parallel) {
if (is_parallel || sgen_get_minor_collector ()->is_parallel)
mono_native_tls_alloc (&worker_block_free_list_key, NULL);
collector->worker_init_cb = sgen_worker_init_callback;
}
mono_counters_register ("# major blocks allocated", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_major_blocks_alloced);
mono_counters_register ("# major blocks freed", MONO_COUNTER_GC | MONO_COUNTER_ULONG, &stat_major_blocks_freed);
......@@ -2863,7 +2870,7 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
collector->is_valid_object = major_is_valid_object;
collector->describe_pointer = major_describe_pointer;
collector->count_cards = major_count_cards;
collector->get_sweep_pool = major_get_sweep_pool;
collector->init_block_free_lists = sgen_init_block_free_lists;
collector->major_ops_serial.copy_or_mark_object = major_copy_or_mark_object_canonical;
collector->major_ops_serial.scan_object = major_scan_object_with_evacuation;
......@@ -2924,11 +2931,13 @@ sgen_marksweep_init_internal (SgenMajorCollector *collector, gboolean is_concurr
/*cardtable requires major pages to be 8 cards aligned*/
g_assert ((ms_block_size % (8 * CARD_SIZE_IN_BYTES)) == 0);
if (concurrent_sweep) {
SgenThreadPool **thread_datas = &sweep_pool;
sweep_pool = &sweep_pool_inst;
sgen_thread_pool_init (sweep_pool, 1, thread_pool_init_func, NULL, NULL, NULL, (SgenThreadPoolData**)&thread_datas);
}
if (is_concurrent && is_parallel)
sgen_workers_create_context (GENERATION_OLD, mono_cpu_count ());
else if (is_concurrent)
sgen_workers_create_context (GENERATION_OLD, 1);
if (concurrent_sweep)
sweep_pool_context = sgen_thread_pool_create_context (1, NULL, NULL, NULL, NULL, NULL);
}
void
......
......@@ -15,10 +15,17 @@
#if defined(SGEN_SIMPLE_NURSERY)
#ifdef SGEN_SIMPLE_PAR_NURSERY
/* Not supported with concurrent major yet */
#ifdef SGEN_CONCURRENT_MAJOR
#define SERIAL_COPY_OBJECT simple_par_nursery_with_concurrent_major_copy_object
#define SERIAL_COPY_OBJECT_FROM_OBJ simple_par_nursery_with_concurrent_major_copy_object_from_obj
#else
#define SERIAL_COPY_OBJECT simple_par_nursery_copy_object
#define SERIAL_COPY_OBJECT_FROM_OBJ simple_par_nursery_copy_object_from_obj
#endif
#else
#ifdef SGEN_CONCURRENT_MAJOR
#define SERIAL_COPY_OBJECT simple_nursery_serial_with_concurrent_major_copy_object
#define SERIAL_COPY_OBJECT_FROM_OBJ simple_nursery_serial_with_concurrent_major_copy_object_from_obj
......@@ -26,6 +33,7 @@
#define SERIAL_COPY_OBJECT simple_nursery_serial_copy_object
#define SERIAL_COPY_OBJECT_FROM_OBJ simple_nursery_serial_copy_object_from_obj
#endif
#endif
#elif defined (SGEN_SPLIT_NURSERY)
......
......@@ -19,10 +19,17 @@ extern guint64 stat_scan_object_called_nursery;
#if defined(SGEN_SIMPLE_NURSERY)
#ifdef SGEN_SIMPLE_PAR_NURSERY
#ifdef SGEN_CONCURRENT_MAJOR
#define SERIAL_SCAN_OBJECT simple_par_nursery_serial_with_concurrent_major_scan_object
#define SERIAL_SCAN_VTYPE simple_par_nursery_serial_with_concurrent_major_scan_vtype
#define SERIAL_SCAN_PTR_FIELD simple_par_nursery_serial_with_concurrent_major_scan_ptr_field
#define SERIAL_DRAIN_GRAY_STACK simple_par_nursery_serial_with_concurrent_major_drain_gray_stack
#else
#define SERIAL_SCAN_OBJECT simple_par_nursery_serial_scan_object
#define SERIAL_SCAN_VTYPE simple_par_nursery_serial_scan_vtype
#define SERIAL_SCAN_PTR_FIELD simple_par_nursery_serial_scan_ptr_field
#define SERIAL_DRAIN_GRAY_STACK simple_par_nursery_serial_drain_gray_stack
#endif
#else
#ifdef SGEN_CONCURRENT_MAJOR
#define SERIAL_SCAN_OBJECT simple_nursery_serial_with_concurrent_major_scan_object
......
......@@ -41,9 +41,14 @@ sgen_pinning_init (void)
void
sgen_init_pinning (void)
{
mono_os_mutex_lock (&pin_queue_mutex);
memset (pin_hash_filter, 0, sizeof (pin_hash_filter));
pin_queue.mem_type = INTERNAL_MEM_PIN_QUEUE;
}
void
sgen_init_pinning_for_conc (void)
{
mono_os_mutex_lock (&pin_queue_mutex);
sgen_pointer_queue_clear (&pin_queue_objs);
}
......@@ -52,6 +57,11 @@ sgen_finish_pinning (void)
{
last_num_pinned = pin_queue.next_slot;
sgen_pointer_queue_clear (&pin_queue);
}
void
sgen_finish_pinning_for_conc (void)
{
mono_os_mutex_unlock (&pin_queue_mutex);
}
......
......@@ -23,6 +23,8 @@ void sgen_pinning_init (void);
void sgen_pin_stage_ptr (void *ptr);
void sgen_optimize_pin_queue (void);
void sgen_init_pinning (void);
void sgen_init_pinning_for_conc (void);
void sgen_finish_pinning_for_conc (void);
void sgen_finish_pinning (void);
void sgen_pinning_register_pinned_in_nursery (GCObject *obj);
void sgen_scan_pin_queue_objects (ScanCopyContext ctx);
......
......@@ -371,9 +371,7 @@ protocol_entry (unsigned char type, gpointer data, int size)
* If the thread is not a worker thread we insert 0, which is interpreted
* as gc thread. Worker indexes are 1 based.
*/
worker_index = sgen_workers_is_worker_thread (tid);
if (!worker_index)
worker_index = sgen_thread_pool_is_thread_pool_thread (major_collector.get_sweep_pool (), tid);
worker_index = sgen_thread_pool_is_thread_pool_thread (tid);
/* FIXME Consider using different index bases for different thread pools */
buffer->buffer [index++] = (unsigned char) worker_index;
}
......
......@@ -19,7 +19,9 @@
#include "mono/sgen/sgen-protocol.h"
#include "mono/sgen/sgen-layout-stats.h"
#include "mono/sgen/sgen-client.h"
#include "mono/sgen/sgen-workers.h"
#include "mono/utils/mono-memory-model.h"
#include "mono/utils/mono-proclib.h"
static inline GCObject*
alloc_for_promotion (GCVTable vtable, GCObject *obj, size_t objsize, gboolean has_references)
......@@ -127,9 +129,24 @@ fill_serial_with_concurrent_major_ops (SgenObjectOperations *ops)
FILL_MINOR_COLLECTOR_SCAN_OBJECT (ops);
}
#define SGEN_SIMPLE_PAR_NURSERY
#include "sgen-minor-copy-object.h"
#include "sgen-minor-scan-object.h"
static void
fill_parallel_with_concurrent_major_ops (SgenObjectOperations *ops)
{
ops->copy_or_mark_object = SERIAL_COPY_OBJECT;
FILL_MINOR_COLLECTOR_SCAN_OBJECT (ops);
}
void
sgen_simple_nursery_init (SgenMinorCollector *collector, gboolean parallel)
{
if (mono_cpu_count () <= 1)
parallel = FALSE;
collector->is_split = FALSE;
collector->is_parallel = parallel;
......@@ -146,6 +163,14 @@ sgen_simple_nursery_init (SgenMinorCollector *collector, gboolean parallel)
fill_serial_ops (&collector->serial_ops);
fill_serial_with_concurrent_major_ops (&collector->serial_ops_with_concurrent_major);
fill_parallel_ops (&collector->parallel_ops);
fill_parallel_with_concurrent_major_ops (&collector->parallel_ops_with_concurrent_major);
/*
* The nursery worker context is created first so it will have priority over
* concurrent mark and concurrent sweep.
*/
if (parallel)
sgen_workers_create_context (GENERATION_NURSERY, mono_cpu_count ());
}
......
......@@ -12,8 +12,22 @@
#include "mono/sgen/sgen-gc.h"
#include "mono/sgen/sgen-thread-pool.h"
#include "mono/sgen/sgen-client.h"
#include "mono/utils/mono-os-mutex.h"
static mono_mutex_t lock;
static mono_cond_t work_cond;
static mono_cond_t done_cond;
static int threads_num;
static MonoNativeThreadId threads [SGEN_THREADPOOL_MAX_NUM_THREADS];
static volatile gboolean threadpool_shutdown;
static volatile int threads_finished;
static int contexts_num;
static SgenThreadPoolContext pool_contexts [SGEN_THREADPOOL_MAX_NUM_CONTEXTS];
enum {
STATE_WAITING,
STATE_IN_PROGRESS,
......@@ -22,10 +36,10 @@ enum {
/* Assumes that the lock is held. */
static SgenThreadPoolJob*
get_job_and_set_in_progress (SgenThreadPool *pool)
get_job_and_set_in_progress (SgenThreadPoolContext *context)
{
for (size_t i = 0; i < pool->job_queue.next_slot; ++i) {
SgenThreadPoolJob *job = (SgenThreadPoolJob *)pool->job_queue.data [i];
for (size_t i = 0; i < context->job_queue.next_slot; ++i) {
SgenThreadPoolJob *job = (SgenThreadPoolJob *)context->job_queue.data [i];
if (job->state == STATE_WAITING) {
job->state = STATE_IN_PROGRESS;
return job;
......@@ -36,10 +50,10 @@ get_job_and_set_in_progress (SgenThreadPool *pool)
/* Assumes that the lock is held. */
static ssize_t
find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job)
find_job_in_queue (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
{
for (ssize_t i = 0; i < pool->job_queue.next_slot; ++i) {
if (pool->job_queue.data [i] == job)
for (ssize_t i = 0; i < context->job_queue.next_slot; ++i) {
if (context->job_queue.data [i] == job)
return i;
}
return -1;
......@@ -47,100 +61,174 @@ find_job_in_queue (SgenThreadPool *pool, SgenThreadPoolJob *job)
/* Assumes that the lock is held. */
static void
remove_job (SgenThreadPool *pool, SgenThreadPoolJob *job)
remove_job (SgenThreadPoolContext *context, SgenThreadPoolJob *job)
{
ssize_t index;
SGEN_ASSERT (0, job->state == STATE_DONE, "Why are we removing a job that's not done?");
index = find_job_in_queue (pool, job);
index = find_job_in_queue (context, job);
SGEN_ASSERT (0, index >= 0, "Why is the job we're trying to remove not in the queue?");
pool->job_queue.data [index] = NULL;
sgen_pointer_queue_remove_nulls (&pool->job_queue);
context->job_queue.data [index] = NULL;
sgen_pointer_queue_remove_nulls (&context->job_queue);
sgen_thread_pool_job_free (job);
}
static gboolean
continue_idle_job (SgenThreadPool *pool, void *thread_data)
continue_idle_job (SgenThreadPoolContext *context, void *thread_data)
{
if (!pool->continue_idle_job_func)
if (!context->continue_idle_job_func)
return FALSE;
return pool->continue_idle_job_func (thread_data);
return context->continue_idle_job_func (thread_data, context - pool_contexts);
}
static gboolean
should_work (SgenThreadPool *pool, void *thread_data)
should_work (SgenThreadPoolContext *context, void *thread_data)
{
if (!pool->should_work_func)
if (!context->should_work_func)
return TRUE;
return pool->should_work_func (thread_data);
return context->should_work_func (thread_data);
}
static mono_native_thread_return_t
thread_func (SgenThreadPoolData *thread_data)
/*
* Tells whether we should lock and attempt to get work from
* a higher priority context.
*/
static gboolean
has_priority_work (int worker_index, int current_context)
{
SgenThreadPool *pool = thread_data->pool;
pool->thread_init_func (thread_data);
int i;
mono_os_mutex_lock (&pool->lock);
for (;;) {
gboolean do_idle;
SgenThreadPoolJob *job;
for (i = 0; i < current_context; i++) {
SgenThreadPoolContext *context = &pool_contexts [i];
void *thread_data;
if (!should_work (pool, thread_data) && !pool->threadpool_shutdown) {
mono_os_cond_wait (&pool->work_cond, &pool->lock);
if (worker_index >= context->num_threads)
continue;
thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
if (!should_work (context, thread_data))
continue;
if (context->job_queue.next_slot > 0)
return TRUE;
if (continue_idle_job (context, thread_data))
return TRUE;
}
/* Return if job enqueued on current context. Jobs have priority over idle work */
if (pool_contexts [current_context].job_queue.next_slot > 0)
return TRUE;
return FALSE;
}
/*
* Gets the highest priority work. If there is none, it waits
* for work_cond. Should always be called with lock held.
*/
static void
get_work (int worker_index, int *work_context, int *do_idle, SgenThreadPoolJob **job)
{
while (!threadpool_shutdown) {
int i;
for (i = 0; i < contexts_num; i++) {
SgenThreadPoolContext *context = &pool_contexts [i];
void *thread_data;
if (worker_index >= context->num_threads)
continue;
thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
if (!should_work (context, thread_data))
continue;
/*
* It's important that we check the continue idle flag with the lock held.
* Suppose we didn't check with the lock held, and the result is FALSE. The
* main thread might then set continue idle and signal us before we can take
* the lock, and we'd lose the signal.
*/
*do_idle = continue_idle_job (context, thread_data);
*job = get_job_and_set_in_progress (context);
if (*job || *do_idle) {
*work_context = i;
return;
}
}
/*
* It's important that we check the continue idle flag with the lock held.
* Suppose we didn't check with the lock held, and the result is FALSE. The
* main thread might then set continue idle and signal us before we can take
* the lock, and we'd lose the signal.
* Nothing to do on any context
* pthread_cond_wait() can return successfully despite the condition
* not being signalled, so we have to run this in a loop until we
* really have work to do.
*/
do_idle = continue_idle_job (pool, thread_data);
job = get_job_and_set_in_progress (pool);
mono_os_cond_wait (&work_cond, &lock);
}
}
if (!job && !do_idle && !pool->threadpool_shutdown) {
/*
* pthread_cond_wait() can return successfully despite the condition
* not being signalled, so we have to run this in a loop until we
* really have work to do.
*/
mono_os_cond_wait (&pool->work_cond, &pool->lock);
continue;
static mono_native_thread_return_t
thread_func (int worker_index)
{
int current_context;
void *thread_data = NULL;
sgen_client_thread_register_worker ();
for (current_context = 0; current_context < contexts_num; current_context++) {
if (worker_index >= pool_contexts [current_context].num_threads ||
!pool_contexts [current_context].thread_init_func)
break;
thread_data = (pool_contexts [current_context].thread_datas) ? pool_contexts [current_context].thread_datas [worker_index] : NULL;
pool_contexts [current_context].thread_init_func (thread_data);
}
current_context = 0;
mono_os_mutex_lock (&lock);
for (;;) {
gboolean do_idle = FALSE;
SgenThreadPoolJob *job = NULL;
SgenThreadPoolContext *context = NULL;
get_work (worker_index, &current_context, &do_idle, &job);
if (!threadpool_shutdown) {
context = &pool_contexts [current_context];
thread_data = (context->thread_datas) ? context->thread_datas [worker_index] : NULL;
}
mono_os_mutex_unlock (&pool->lock);
mono_os_mutex_unlock (&lock);
if (job) {
job->func (thread_data, job);
mono_os_mutex_lock (&pool->lock);
mono_os_mutex_lock (&lock);
SGEN_ASSERT (0, job->state == STATE_IN_PROGRESS, "The job should still be in progress.");
job->state = STATE_DONE;
remove_job (pool, job);
remove_job (context, job);
/*
* Only the main GC thread will ever wait on the done condition, so we don't
* have to broadcast.
*/
mono_os_cond_signal (&pool->done_cond);
mono_os_cond_signal (&done_cond);
} else if (do_idle) {
SGEN_ASSERT (0, pool->idle_job_func, "Why do we have idle work when there's no idle job function?");
SGEN_ASSERT (0, context->idle_job_func, "Why do we have idle work when there's no idle job function?");
do {
pool->idle_job_func (thread_data);
do_idle = continue_idle_job (pool, thread_data);
} while (do_idle && !pool->job_queue.next_slot);
context->idle_job_func (thread_data);
do_idle = continue_idle_job (context, thread_data);
} while (do_idle && !has_priority_work (worker_index, current_context));
mono_os_mutex_lock (&pool->lock);
mono_os_mutex_lock (&lock);
if (!do_idle)
mono_os_cond_signal (&pool->done_cond);
mono_os_cond_signal (&done_cond);
} else {
SGEN_ASSERT (0, pool->threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
mono_os_mutex_lock (&pool->lock);
pool->threads_finished++;
mono_os_cond_signal (&pool->done_cond);
mono_os_mutex_unlock (&pool->lock);
SGEN_ASSERT (0, threadpool_shutdown, "Why did we unlock if no jobs and not shutting down?");
mono_os_mutex_lock (&lock);
threads_finished++;
mono_os_cond_signal (&done_cond);
mono_os_mutex_unlock (&lock);
return 0;
}
}
......@@ -148,50 +236,71 @@ thread_func (SgenThreadPoolData *thread_data)
return (mono_native_thread_return_t)0;
}
int
sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas)
{
int context_id = contexts_num;
SGEN_ASSERT (0, contexts_num < SGEN_THREADPOOL_MAX_NUM_CONTEXTS, "Maximum sgen thread pool contexts reached");
pool_contexts [context_id].thread_init_func = init_func;
pool_contexts [context_id].idle_job_func = idle_func;
pool_contexts [context_id].continue_idle_job_func = continue_idle_func;
pool_contexts [context_id].should_work_func = should_work_func;
pool_contexts [context_id].thread_datas = thread_datas;
SGEN_ASSERT (0, num_threads <= SGEN_THREADPOOL_MAX_NUM_THREADS, "Maximum sgen thread pool threads exceeded");
pool_contexts [context_id].num_threads = num_threads;
sgen_pointer_queue_init (&pool_contexts [contexts_num].job_queue, 0);
contexts_num++;
return context_id;
}
void
sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func_p, SgenThreadPoolData **thread_datas)
sgen_thread_pool_start (void)
{
int i;
SGEN_ASSERT (0, num_threads > 0, "Why are we creating a threadpool with no threads?");
pool->threads_num = (num_threads < MAX_NUM_THREADS) ? num_threads : MAX_NUM_THREADS;
for (i = 0; i < contexts_num; i++) {
if (threads_num < pool_contexts [i].num_threads)
threads_num = pool_contexts [i].num_threads;
}
mono_os_mutex_init (&pool->lock);
mono_os_cond_init (&pool->work_cond);
mono_os_cond_init (&pool->done_cond);
if (!threads_num)
return;
pool->thread_init_func = init_func;
pool->idle_job_func = idle_func;
pool->continue_idle_job_func = continue_idle_func;
pool->should_work_func = should_work_func_p;
mono_os_mutex_init (&lock);
mono_os_cond_init (&work_cond);
mono_os_cond_init (&done_cond);
sgen_pointer_queue_init (&pool->job_queue, 0);
pool->threads_finished = 0;
pool->threadpool_shutdown = FALSE;
threads_finished = 0;
threadpool_shutdown = FALSE;
for (i = 0; i < pool->threads_num; i++) {
thread_datas [i]->pool = pool;
mono_native_thread_create (&pool->threads [i], thread_func, thread_datas [i]);
for (i = 0; i < threads_num; i++) {
mono_native_thread_create (&threads [i], thread_func, (void*)(gsize)i);
}
}
void
sgen_thread_pool_shutdown (SgenThreadPool *pool)
sgen_thread_pool_shutdown (void)
{
if (!pool)
if (!threads_num)
return;
mono_os_mutex_lock (&pool->lock);
pool->threadpool_shutdown = TRUE;
mono_os_cond_broadcast (&pool->work_cond);
while (pool->threads_finished < pool->threads_num)
mono_os_cond_wait (&pool->done_cond, &pool->lock);
mono_os_mutex_unlock (&pool->lock);
mono_os_mutex_lock (&lock);
threadpool_shutdown = TRUE;
mono_os_cond_broadcast (&work_cond);
while (threads_finished < threads_num)
mono_os_cond_wait (&done_cond, &lock);
mono_os_mutex_unlock (&lock);
mono_os_mutex_destroy (&pool->lock);
mono_os_cond_destroy (&pool->work_cond);
mono_os_cond_destroy (&pool->done_cond);
mono_os_mutex_destroy (&lock);
mono_os_cond_destroy (&work_cond);
mono_os_cond_destroy (&done_cond);
}
SgenThreadPoolJob*
......@@ -212,77 +321,74 @@ sgen_thread_pool_job_free (SgenThreadPoolJob *job)
}
void
sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job)
sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job)
{
mono_os_mutex_lock (&pool->lock);
mono_os_mutex_lock (&lock);
sgen_pointer_queue_add (&pool->job_queue, job);
mono_os_cond_signal (&pool->work_cond);
sgen_pointer_queue_add (&pool_contexts [context_id].job_queue, job);
mono_os_cond_broadcast (&work_cond);
mono_os_mutex_unlock (&pool->lock);
mono_os_mutex_unlock (&lock);
}
void
sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job)
sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job)
{
SGEN_ASSERT (0, job, "Where's the job?");
mono_os_mutex_lock (&pool->lock);
mono_os_mutex_lock (&lock);
while (find_job_in_queue (pool, job) >= 0)
mono_os_cond_wait (&pool->done_cond, &pool->lock);
while (find_job_in_queue (&pool_contexts [context_id], job) >= 0)
mono_os_cond_wait (&done_cond, &lock);
mono_os_mutex_unlock (&pool->lock);
mono_os_mutex_unlock (&lock);
}
void
sgen_thread_pool_idle_signal (SgenThreadPool *pool)
sgen_thread_pool_idle_signal (int context_id)
{
SGEN_ASSERT (0, pool->idle_job_func, "Why are we signaling idle without an idle function?");
SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we signaling idle without an idle function?");
mono_os_mutex_lock (&pool->lock);
mono_os_mutex_lock (&lock);
if (pool->continue_idle_job_func (NULL))
mono_os_cond_broadcast (&pool->work_cond);
if (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
mono_os_cond_broadcast (&work_cond);
mono_os_mutex_unlock (&pool->lock);
mono_os_mutex_unlock (&lock);
}
void
sgen_thread_pool_idle_wait (SgenThreadPool *pool)
sgen_thread_pool_idle_wait (int context_id)
{
SGEN_ASSERT (0, pool->idle_job_func, "Why are we waiting for idle without an idle function?");
SGEN_ASSERT (0, pool_contexts [context_id].idle_job_func, "Why are we waiting for idle without an idle function?");
mono_os_mutex_lock (&pool->lock);
mono_os_mutex_lock (&lock);
while (pool->continue_idle_job_func (NULL))
mono_os_cond_wait (&pool->done_cond, &pool->lock);
while (pool_contexts [context_id].continue_idle_job_func (NULL, context_id))
mono_os_cond_wait (&done_cond, &lock);
mono_os_mutex_unlock (&pool->lock);
mono_os_mutex_unlock (&lock);
}
void
sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool)
sgen_thread_pool_wait_for_all_jobs (int context_id)
{
mono_os_mutex_lock (&pool->lock);
mono_os_mutex_lock (&lock);
while (!sgen_pointer_queue_is_empty (&pool->job_queue))
mono_os_cond_wait (&pool->done_cond, &pool->lock);
while (!sgen_pointer_queue_is_empty (&pool_contexts [context_id].job_queue))
mono_os_cond_wait (&done_cond, &lock);
mono_os_mutex_unlock (&pool->lock);
mono_os_mutex_unlock (&lock);
}
/* Return 0 if is not a thread pool thread or the thread number otherwise */
int
sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId some_thread)
sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId some_thread)
{
int i;
if (!pool)
return 0;
for (i = 0; i < pool->threads_num; i++) {
if (some_thread == pool->threads [i])
for (i = 0; i < threads_num; i++) {
if (some_thread == threads [i])
return i + 1;
}
......
......@@ -13,14 +13,16 @@
#include "mono/sgen/sgen-pointer-queue.h"
#include "mono/utils/mono-threads.h"
#define SGEN_THREADPOOL_MAX_NUM_THREADS 8
#define SGEN_THREADPOOL_MAX_NUM_CONTEXTS 3
typedef struct _SgenThreadPoolJob SgenThreadPoolJob;
typedef struct _SgenThreadPool SgenThreadPool;
typedef struct _SgenThreadPoolData SgenThreadPoolData;
typedef struct _SgenThreadPoolContext SgenThreadPoolContext;
typedef void (*SgenThreadPoolJobFunc) (void *thread_data, SgenThreadPoolJob *job);
typedef void (*SgenThreadPoolThreadInitFunc) (void*);
typedef void (*SgenThreadPoolIdleJobFunc) (void*);
typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*);
typedef gboolean (*SgenThreadPoolContinueIdleJobFunc) (void*, int);
typedef gboolean (*SgenThreadPoolShouldWorkFunc) (void*);
struct _SgenThreadPoolJob {
......@@ -30,16 +32,7 @@ struct _SgenThreadPoolJob {
volatile gint32 state;
};
#define MAX_NUM_THREADS 8
struct _SgenThreadPool {
mono_mutex_t lock;
mono_cond_t work_cond;
mono_cond_t done_cond;
int threads_num;
MonoNativeThreadId threads [MAX_NUM_THREADS];
struct _SgenThreadPoolContext {
/* Only accessed with the lock held. */
SgenPointerQueue job_queue;
......@@ -48,31 +41,29 @@ struct _SgenThreadPool {
SgenThreadPoolContinueIdleJobFunc continue_idle_job_func;
SgenThreadPoolShouldWorkFunc should_work_func;
volatile gboolean threadpool_shutdown;
volatile int threads_finished;
void **thread_datas;
int num_threads;
};
struct _SgenThreadPoolData {
SgenThreadPool *pool;
};
void sgen_thread_pool_init (SgenThreadPool *pool, int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, SgenThreadPoolData **thread_datas);
int sgen_thread_pool_create_context (int num_threads, SgenThreadPoolThreadInitFunc init_func, SgenThreadPoolIdleJobFunc idle_func, SgenThreadPoolContinueIdleJobFunc continue_idle_func, SgenThreadPoolShouldWorkFunc should_work_func, void **thread_datas);
void sgen_thread_pool_start (void);
void sgen_thread_pool_shutdown (SgenThreadPool *pool);
void sgen_thread_pool_shutdown (void);
SgenThreadPoolJob* sgen_thread_pool_job_alloc (const char *name, SgenThreadPoolJobFunc func, size_t size);
/* This only needs to be called on jobs that are not enqueued. */
void sgen_thread_pool_job_free (SgenThreadPoolJob *job);
void sgen_thread_pool_job_enqueue (SgenThreadPool *pool, SgenThreadPoolJob *job);
void sgen_thread_pool_job_enqueue (int context_id, SgenThreadPoolJob *job);
/* This must only be called after the job has been enqueued. */
void sgen_thread_pool_job_wait (SgenThreadPool *pool, SgenThreadPoolJob *job);
void sgen_thread_pool_job_wait (int context_id, SgenThreadPoolJob *job);
void sgen_thread_pool_idle_signal (SgenThreadPool *pool);
void sgen_thread_pool_idle_wait (SgenThreadPool *pool);
void sgen_thread_pool_idle_signal (int context_id);
void sgen_thread_pool_idle_wait (int context_id);
void sgen_thread_pool_wait_for_all_jobs (SgenThreadPool *pool);
void sgen_thread_pool_wait_for_all_jobs (int context_id);
int sgen_thread_pool_is_thread_pool_thread (SgenThreadPool *pool, MonoNativeThreadId thread);
int sgen_thread_pool_is_thread_pool_thread (MonoNativeThreadId thread);
#endif
此差异已折叠。
......@@ -14,12 +14,14 @@
#include "mono/sgen/sgen-thread-pool.h"
typedef struct _WorkerData WorkerData;
typedef struct _WorkerContext WorkerContext;
typedef gint32 State;
typedef void (*SgenWorkersFinishCallback) (void);
typedef void (*SgenWorkerCallback) (WorkerData *data);
struct _WorkerData {
/*
* Threadpool threads receive as their starting argument a WorkerData.
* tp_data is meant for use inside the sgen thread pool and must be first.
*/
SgenThreadPoolData tp_data;
gint32 state;
SgenGrayQueue private_gray_queue; /* only read/written by worker thread */
/*
......@@ -29,29 +31,54 @@ struct _WorkerData {
* starts.
*/
gpointer free_block_lists;
WorkerContext *context;
};
typedef void (*SgenWorkersFinishCallback) (void);
typedef void (*SgenWorkerCallback) (WorkerData *data);
struct _WorkerContext {
int workers_num;
int active_workers_num;
volatile gboolean started;
volatile gboolean forced_stop;
WorkerData *workers_data;
/*
* When using multiple workers, we need to have the last worker
* enqueue the preclean jobs (if there are any). This lock ensures
* that when the last worker takes it, all the other workers have
* gracefully finished, so it can restart them.
*/
mono_mutex_t finished_lock;
volatile gboolean workers_finished;
int worker_awakenings;
SgenSectionGrayQueue workers_distribute_gray_queue;
SgenObjectOperations * volatile idle_func_object_ops;
SgenObjectOperations *idle_func_object_ops_par, *idle_func_object_ops_nopar;
/*
* finished_callback is called only when the workers finish work normally (when they
* are not forced to finish). The callback is used to enqueue preclean jobs.
*/
volatile SgenWorkersFinishCallback finish_callback;
int generation;
int thread_pool_context;
};
void sgen_workers_init (int num_workers, SgenWorkerCallback callback);
void sgen_workers_shutdown (void);
void sgen_workers_stop_all_workers (void);
void sgen_workers_set_num_active_workers (int num_workers);
void sgen_workers_start_all_workers (SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback finish_job);
void sgen_workers_init_distribute_gray_queue (void);
void sgen_workers_enqueue_job (SgenThreadPoolJob *job, gboolean enqueue);
void sgen_workers_distribute_gray_queue_sections (void);
void sgen_workers_reset_data (void);
void sgen_workers_join (void);
gboolean sgen_workers_have_idle_work (void);
void sgen_workers_create_context (int generation, int num_workers);
void sgen_workers_stop_all_workers (int generation);
void sgen_workers_set_num_active_workers (int generation, int num_workers);
void sgen_workers_start_all_workers (int generation, SgenObjectOperations *object_ops_nopar, SgenObjectOperations *object_ops_par, SgenWorkersFinishCallback finish_job);
void sgen_workers_enqueue_job (int generation, SgenThreadPoolJob *job, gboolean enqueue);
void sgen_workers_join (int generation);
gboolean sgen_workers_have_idle_work (int generation);
gboolean sgen_workers_all_done (void);
gboolean sgen_workers_are_working (void);
void sgen_workers_assert_gray_queue_is_empty (void);
void sgen_workers_take_from_queue (SgenGrayQueue *queue);
SgenObjectOperations* sgen_workers_get_idle_func_object_ops (void);
int sgen_workers_get_job_split_count (void);
void sgen_workers_foreach (SgenWorkerCallback callback);
void sgen_workers_assert_gray_queue_is_empty (int generation);
void sgen_workers_take_from_queue (int generation, SgenGrayQueue *queue);
SgenObjectOperations* sgen_workers_get_idle_func_object_ops (WorkerData *worker);
int sgen_workers_get_job_split_count (int generation);
void sgen_workers_foreach (int generation, SgenWorkerCallback callback);
gboolean sgen_workers_is_worker_thread (MonoNativeThreadId id);
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册