futex-requeue.c 5.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/*
 * Copyright (C) 2013  Davidlohr Bueso <davidlohr@hp.com>
 *
 * futex-requeue: Block a bunch of threads on futex1 and requeue them
 *                on futex2, N at a time.
 *
 * This program is particularly useful to measure the latency of nthread
 * requeues without waking up any tasks -- thus mimicking a regular futex_wait.
 */

11
/* For the CLR_() macros */
12
#include <string.h>
13 14
#include <pthread.h>

15
#include <signal.h>
16
#include "../util/stat.h"
17
#include <subcmd/parse-options.h>
18
#include <linux/compiler.h>
19
#include <linux/kernel.h>
20
#include <linux/time64.h>
21
#include <errno.h>
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
#include "bench.h"
#include "futex.h"

#include <err.h>
#include <stdlib.h>
#include <sys/time.h>

static u_int32_t futex1 = 0, futex2 = 0;

/*
 * How many tasks to requeue at a time.
 * Default to 1 in order to make the kernel work more.
 */
static unsigned int nrequeue = 1;

static pthread_t *worker;
38
static bool done = false, silent = false, fshared = false;
39 40 41 42
static pthread_mutex_t thread_lock;
static pthread_cond_t thread_parent, thread_worker;
static struct stats requeuetime_stats, requeued_stats;
static unsigned int ncpus, threads_starting, nthreads = 0;
43
static int futex_flag = 0;
44 45 46 47 48

static const struct option options[] = {
	OPT_UINTEGER('t', "threads",  &nthreads, "Specify amount of threads"),
	OPT_UINTEGER('q', "nrequeue", &nrequeue, "Specify amount of threads to requeue at once"),
	OPT_BOOLEAN( 's', "silent",   &silent,   "Silent mode: do not display data/details"),
49
	OPT_BOOLEAN( 'S', "shared",   &fshared,  "Use shared futexes instead of private ones"),
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
	OPT_END()
};

static const char * const bench_futex_requeue_usage[] = {
	"perf bench futex requeue <options>",
	NULL
};

static void print_summary(void)
{
	double requeuetime_avg = avg_stats(&requeuetime_stats);
	double requeuetime_stddev = stddev_stats(&requeuetime_stats);
	unsigned int requeued_avg = avg_stats(&requeued_stats);

	printf("Requeued %d of %d threads in %.4f ms (+-%.2f%%)\n",
	       requeued_avg,
	       nthreads,
67
	       requeuetime_avg / USEC_PER_MSEC,
68 69 70 71 72 73 74 75 76 77 78 79
	       rel_stddev_stats(requeuetime_stddev, requeuetime_avg));
}

static void *workerfn(void *arg __maybe_unused)
{
	pthread_mutex_lock(&thread_lock);
	threads_starting--;
	if (!threads_starting)
		pthread_cond_signal(&thread_parent);
	pthread_cond_wait(&thread_worker, &thread_lock);
	pthread_mutex_unlock(&thread_lock);

80
	futex_wait(&futex1, 0, NULL, futex_flag);
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
	return NULL;
}

static void block_threads(pthread_t *w,
			  pthread_attr_t thread_attr)
{
	cpu_set_t cpu;
	unsigned int i;

	threads_starting = nthreads;

	/* create and block all threads */
	for (i = 0; i < nthreads; i++) {
		CPU_ZERO(&cpu);
		CPU_SET(i % ncpus, &cpu);

		if (pthread_attr_setaffinity_np(&thread_attr, sizeof(cpu_set_t), &cpu))
			err(EXIT_FAILURE, "pthread_attr_setaffinity_np");

		if (pthread_create(&w[i], &thread_attr, workerfn, NULL))
			err(EXIT_FAILURE, "pthread_create");
	}
}

static void toggle_done(int sig __maybe_unused,
			siginfo_t *info __maybe_unused,
			void *uc __maybe_unused)
{
	done = true;
}

int bench_futex_requeue(int argc, const char **argv,
			const char *prefix __maybe_unused)
{
	int ret = 0;
	unsigned int i, j;
	struct sigaction act;
	pthread_attr_t thread_attr;

	argc = parse_options(argc, argv, options, bench_futex_requeue_usage, 0);
	if (argc)
		goto err;

	ncpus = sysconf(_SC_NPROCESSORS_ONLN);

	sigfillset(&act.sa_mask);
	act.sa_sigaction = toggle_done;
	sigaction(SIGINT, &act, NULL);

	if (!nthreads)
		nthreads = ncpus;

	worker = calloc(nthreads, sizeof(*worker));
	if (!worker)
		err(EXIT_FAILURE, "calloc");

137 138 139
	if (!fshared)
		futex_flag = FUTEX_PRIVATE_FLAG;

140 141 142
	if (nrequeue > nthreads)
		nrequeue = nthreads;

143 144 145
	printf("Run summary [PID %d]: Requeuing %d threads (from [%s] %p to %p), "
	       "%d at a time.\n\n",  getpid(), nthreads,
	       fshared ? "shared":"private", &futex1, &futex2, nrequeue);
146 147 148 149 150 151 152 153

	init_stats(&requeued_stats);
	init_stats(&requeuetime_stats);
	pthread_attr_init(&thread_attr);
	pthread_mutex_init(&thread_lock, NULL);
	pthread_cond_init(&thread_parent, NULL);
	pthread_cond_init(&thread_worker, NULL);

154
	for (j = 0; j < bench_repeat && !done; j++) {
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
		unsigned int nrequeued = 0;
		struct timeval start, end, runtime;

		/* create, launch & block all threads */
		block_threads(worker, thread_attr);

		/* make sure all threads are already blocked */
		pthread_mutex_lock(&thread_lock);
		while (threads_starting)
			pthread_cond_wait(&thread_parent, &thread_lock);
		pthread_cond_broadcast(&thread_worker);
		pthread_mutex_unlock(&thread_lock);

		usleep(100000);

		/* Ok, all threads are patiently blocked, start requeueing */
		gettimeofday(&start, NULL);
172
		while (nrequeued < nthreads) {
173 174 175 176
			/*
			 * Do not wakeup any tasks blocked on futex1, allowing
			 * us to really measure futex_wait functionality.
			 */
177 178
			nrequeued += futex_cmp_requeue(&futex1, 0, &futex2, 0,
						       nrequeue, futex_flag);
179
		}
180

181 182 183 184 185 186 187 188
		gettimeofday(&end, NULL);
		timersub(&end, &start, &runtime);

		update_stats(&requeued_stats, nrequeued);
		update_stats(&requeuetime_stats, runtime.tv_usec);

		if (!silent) {
			printf("[Run %d]: Requeued %d of %d threads in %.4f ms\n",
189
			       j + 1, nrequeued, nthreads, runtime.tv_usec / (double)USEC_PER_MSEC);
190 191 192
		}

		/* everybody should be blocked on futex2, wake'em up */
193
		nrequeued = futex_wake(&futex2, nrequeued, futex_flag);
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
		if (nthreads != nrequeued)
			warnx("couldn't wakeup all tasks (%d/%d)", nrequeued, nthreads);

		for (i = 0; i < nthreads; i++) {
			ret = pthread_join(worker[i], NULL);
			if (ret)
				err(EXIT_FAILURE, "pthread_join");
		}
	}

	/* cleanup & report results */
	pthread_cond_destroy(&thread_parent);
	pthread_cond_destroy(&thread_worker);
	pthread_mutex_destroy(&thread_lock);
	pthread_attr_destroy(&thread_attr);

	print_summary();

	free(worker);
	return ret;
err:
	usage_with_options(bench_futex_requeue_usage, options);
	exit(EXIT_FAILURE);
}