cdbgang_thread.c 12.2 KB
Newer Older
1 2 3 4 5 6

/*-------------------------------------------------------------------------
 *
 * cdbgang_thread.c
 *	  Functions for multi-thread implementation of creating gang.
 *
7 8 9 10 11 12
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present Pivotal Software, Inc.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/dispatcher/cdbgang_thread.c
13 14 15 16 17 18 19 20 21 22
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"
#include <pthread.h>
#include <limits.h>

#include "storage/ipc.h"		/* For proc_exit_inprogress  */
#include "tcop/tcopprot.h"
23 24
#include "libpq-fe.h"
#include "libpq-int.h"
25 26 27 28
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbvars.h"
#include "miscadmin.h"
29
#include "utils/resowner.h"
30 31 32 33 34 35 36
/*
 * Parameter structure for the DoConnect threads
 */
typedef struct DoConnectParms
{
	/*
	 * db_count: The number of segdbs that this thread is responsible for
37
	 * connecting to. Equals the count of segdbDescPtrArray below.
38
	 */
39
	int			db_count;
40 41

	/*
42 43
	 * segdbDescPtrArray: Array of SegmentDatabaseDescriptor* 's that this
	 * thread is responsible for connecting to. Has size equal to db_count.
44 45 46 47
	 */
	SegmentDatabaseDescriptor **segdbDescPtrArray;

	/* type of gang. */
48
	GangType	type;
49

50
	int			gangId;
51 52

	/* connect options. GUC etc. */
53
	char	   *connectOptions;
54 55

	/* The pthread_t thread handle. */
56
	pthread_t	thread;
57 58 59 60 61
} DoConnectParms;

static DoConnectParms *makeConnectParms(int parmsCount, GangType type, int gangId);
static void destroyConnectParms(DoConnectParms *doConnectParmsAr, int count);
static void *thread_DoConnect(void *arg);
62 63 64 65
static void checkConnectionStatus(Gang *gp,
					  int *countInRecovery,
					  int *countSuccessful,
					  struct PQExpBufferData *errorMessage);
66 67 68
static Gang *createGang_thread(GangType type, int gang_id, int size, int content);

CreateGangFunc pCreateGangFuncThreaded = createGang_thread;
69

70 71 72 73 74 75 76 77 78
/*
 * Creates a new gang by logging on a session to each segDB involved.
 *
 * call this function in GangContext memory context.
 * elog ERROR or return a non-NULL gang.
 */
static Gang *
createGang_thread(GangType type, int gang_id, int size, int content)
{
79
	Gang	   *newGangDefinition = NULL;
80 81 82
	SegmentDatabaseDescriptor *segdbDesc = NULL;
	DoConnectParms *doConnectParmsAr = NULL;
	DoConnectParms *pParms = NULL;
83 84 85 86 87 88
	int			parmIndex = 0;
	int			threadCount = 0;
	int			i = 0;
	int			create_gang_retry_counter = 0;
	int			in_recovery_mode_count = 0;
	int			successful_connections = 0;
89

90 91
	PQExpBufferData create_gang_error;

92
	ELOG_DISPATCHER_DEBUG("createGang type = %d, gang_id = %d, size = %d, content = %d",
93
						  type, gang_id, size, content);
94 95 96 97 98 99 100

	/* check arguments */
	Assert(size == 1 || size == getgpsegmentCount());
	Assert(CurrentResourceOwner != NULL);
	Assert(CurrentMemoryContext == GangContext);
	Assert(gp_connections_per_thread > 0);

101 102
	/* Writer gang is created before reader gangs. */
	if (type == GANGTYPE_PRIMARY_WRITER)
103
		Insist(!GangsExist());
104 105 106

	initPQExpBuffer(&create_gang_error);

107 108
	Assert(CurrentGangCreating == NULL);

109
create_gang_retry:
110

111 112 113 114 115 116
	/*
	 * If we're in a retry, we may need to reset our initial state a bit. We
	 * also want to ensure that all resources have been released.
	 */
	Assert(newGangDefinition == NULL);
	Assert(doConnectParmsAr == NULL);
117 118 119 120 121 122
	successful_connections = 0;
	in_recovery_mode_count = 0;
	threadCount = 0;

	/* allocate and initialize a gang structure */
	newGangDefinition = buildGangDefinition(type, gang_id, size, content);
123 124
	CurrentGangCreating = newGangDefinition;

125 126 127 128 129
	Assert(newGangDefinition != NULL);
	Assert(newGangDefinition->size == size);
	Assert(newGangDefinition->perGangContext != NULL);
	MemoryContextSwitchTo(newGangDefinition->perGangContext);

130
	resetPQExpBuffer(&create_gang_error);
131

132
	/*
133 134 135 136 137
	 * The most threads we could have is segdb_count /
	 * gp_connections_per_thread, rounded up. This is equivalent to 1 +
	 * (segdb_count-1) / gp_connections_per_thread. We allocate enough memory
	 * for this many DoConnectParms structures, even though we may not use
	 * them all.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
	 */
	threadCount = 1 + (size - 1) / gp_connections_per_thread;
	Assert(threadCount > 0);

	/* initialize connect parameters */
	doConnectParmsAr = makeConnectParms(threadCount, type, gang_id);
	for (i = 0; i < size; i++)
	{
		parmIndex = i / gp_connections_per_thread;
		pParms = &doConnectParmsAr[parmIndex];
		segdbDesc = &newGangDefinition->db_descriptors[i];
		pParms->segdbDescPtrArray[pParms->db_count++] = segdbDesc;
	}

	/* start threads and doing the connect */
	for (i = 0; i < threadCount; i++)
	{
155 156
		int			pthread_err;

157 158 159
		pParms = &doConnectParmsAr[i];

		ELOG_DISPATCHER_DEBUG("createGang creating thread %d of %d for libpq connections",
160
							  i + 1, threadCount);
161 162 163 164

		pthread_err = gp_pthread_create(&pParms->thread, thread_DoConnect, pParms, "createGang");
		if (pthread_err != 0)
		{
165
			int			j;
166 167 168 169 170 171 172 173 174 175 176 177 178

			/*
			 * Error during thread create (this should be caused by resource
			 * constraints). If we leave the threads running, they'll
			 * immediately have some problems -- so we need to join them, and
			 * *then* we can issue our FATAL error
			 */
			for (j = 0; j < i; j++)
			{
				pthread_join(doConnectParmsAr[j].thread, NULL);
			}

			ereport(FATAL, (errcode(ERRCODE_INTERNAL_ERROR),
179 180
							errmsg("failed to create thread %d of %d", i + 1, threadCount),
							errdetail("pthread_create() failed with err %d", pthread_err)));
181 182 183 184 185 186 187 188 189
		}
	}

	/*
	 * wait for all of the DoConnect threads to complete.
	 */
	for (i = 0; i < threadCount; i++)
	{
		ELOG_DISPATCHER_DEBUG("joining to thread %d of %d for libpq connections",
190
							  i + 1, threadCount);
191 192 193 194 195 196 197 198 199 200 201 202 203

		if (0 != pthread_join(doConnectParmsAr[i].thread, NULL))
		{
			elog(FATAL, "could not create segworker group");
		}
	}

	/*
	 * Free the memory allocated for the threadParms array
	 */
	destroyConnectParms(doConnectParmsAr, threadCount);
	doConnectParmsAr = NULL;

204 205
	SIMPLE_FAULT_INJECTOR(GangCreated);

206 207
	/* find out the successful connections and the failed ones */
	checkConnectionStatus(newGangDefinition, &in_recovery_mode_count,
208
						  &successful_connections, &create_gang_error);
209 210

	ELOG_DISPATCHER_DEBUG("createGang: %d processes requested; %d successful connections %d in recovery",
211
						  size, successful_connections, in_recovery_mode_count);
212 213 214 215 216 217

	MemoryContextSwitchTo(GangContext);

	if (size == successful_connections)
	{
		setLargestGangsize(size);
218
		termPQExpBuffer(&create_gang_error);
219 220
		CurrentGangCreating = NULL;

221 222 223 224 225
		return newGangDefinition;
	}

	/* there'er failed connections */

A
Ashwin Agrawal 已提交
226
	FtsNotifyProber();
227
	/* FTS shows some segment DBs are down, destroy all gangs. */
228
	if (FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size))
229
	{
230
		appendPQExpBuffer(&create_gang_error, "FTS detected one or more segments are down\n");
231 232 233
		goto exit;
	}

234
	/* failure due to recovery */
235
	if (successful_connections + in_recovery_mode_count == size)
236
	{
237 238 239 240 241
		if (gp_gang_creation_retry_count &&
			create_gang_retry_counter++ < gp_gang_creation_retry_count &&
			type == GANGTYPE_PRIMARY_WRITER)
		{
			/*
242 243
			 * Retry for non-writer gangs is meaningless because writer gang
			 * must be gone when QE is in recovery mode
244
			 */
245
			DisconnectAndDestroyGang(newGangDefinition);
246
			newGangDefinition = NULL;
247
			CurrentGangCreating = NULL;
248

249
			ELOG_DISPATCHER_DEBUG("createGang: gang creation failed, but retryable.");
250

251 252 253
			CHECK_FOR_INTERRUPTS();
			pg_usleep(gp_gang_creation_retry_timer * 1000);
			CHECK_FOR_INTERRUPTS();
254

255 256
			goto create_gang_retry;
		}
257

D
Daniel Gustafsson 已提交
258
		appendPQExpBuffer(&create_gang_error, "segment(s) are in recovery mode\n");
259
	}
260

261
exit:
262
	if (newGangDefinition != NULL)
263
		DisconnectAndDestroyGang(newGangDefinition);
264

265 266
	if (type == GANGTYPE_PRIMARY_WRITER)
	{
267
		DisconnectAndDestroyAllGangs(true);
268 269 270
		CheckForResetSession();
	}

271 272
	CurrentGangCreating = NULL;

273
	ereport(ERROR,
274 275 276
			(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
			 errmsg("failed to acquire resources on one or more segments"),
			 errdetail("%s", create_gang_error.data)));
277 278 279 280 281 282 283 284 285 286 287 288
	return NULL;
}

/*
 *	Thread procedure.
 *	Perform the connect.
 */
static void *
thread_DoConnect(void *arg)
{
	DoConnectParms *pParms = (DoConnectParms *) arg;
	SegmentDatabaseDescriptor **segdbDescPtrArray = pParms->segdbDescPtrArray;
289
	int			db_count = pParms->db_count;
290 291

	SegmentDatabaseDescriptor *segdbDesc = NULL;
292
	int			i = 0;
293 294 295 296

	gp_set_thread_sigmasks();

	/*
297 298
	 * The pParms contains an array of SegmentDatabaseDescriptors to connect
	 * to.
299 300 301
	 */
	for (i = 0; i < db_count; i++)
	{
302
		bool		ret;
303
		char		gpqeid[100];
304 305 306 307 308 309 310 311 312 313 314 315 316 317

		segdbDesc = segdbDescPtrArray[i];

		if (segdbDesc == NULL || segdbDesc->segment_database_info == NULL)
		{
			write_log("thread_DoConnect: bad segment definition during gang creation %d/%d\n", i, db_count);
			continue;
		}

		/*
		 * Build the connection string.  Writer-ness needs to be processed
		 * early enough now some locks are taken before command line options
		 * are recognized.
		 */
318 319 320 321 322 323 324 325 326 327 328 329 330
		ret = build_gpqeid_param(gpqeid, sizeof(gpqeid),
								 pParms->type == GANGTYPE_PRIMARY_WRITER,
								 pParms->gangId,
								 segdbDesc->segment_database_info->hostSegs);

		if (!ret)
		{
			segdbDesc->errcode = ERRCODE_INTERNAL_ERROR;
			appendPQExpBuffer(&segdbDesc->error_message,
							  "Internal error: unable to construct connection string");
			write_log("thread_DoConnect: unable to construct connection string for segdb %i", i);
			continue;
		}
331 332 333 334 335 336 337 338 339 340 341 342 343

		/* check the result in createGang */
		cdbconn_doConnect(segdbDesc, gpqeid, pParms->connectOptions);
	}

	return (NULL);
}

/*
 * Initialize a DoConnectParms structure.
 *
 * Including initialize the connect option string.
 */
344 345
static DoConnectParms *
makeConnectParms(int parmsCount, GangType type, int gangId)
346
{
347 348 349 350 351
	DoConnectParms *doConnectParmsAr =
	(DoConnectParms *) palloc0(parmsCount * sizeof(DoConnectParms));
	DoConnectParms *pParms = NULL;
	int			segdbPerThread = gp_connections_per_thread;
	int			i = 0;
352 353 354 355

	for (i = 0; i < parmsCount; i++)
	{
		pParms = &doConnectParmsAr[i];
356 357
		pParms->segdbDescPtrArray =
			(SegmentDatabaseDescriptor **) palloc0(segdbPerThread * sizeof(SegmentDatabaseDescriptor *));
358 359 360 361 362 363 364 365 366 367 368 369
		MemSet(&pParms->thread, 0, sizeof(pthread_t));
		pParms->db_count = 0;
		pParms->type = type;
		pParms->connectOptions = makeOptions();
		pParms->gangId = gangId;
	}
	return doConnectParmsAr;
}

/*
 * Free all the memory allocated in DoConnectParms.
 */
370 371
static void
destroyConnectParms(DoConnectParms *doConnectParmsAr, int count)
372 373 374
{
	if (doConnectParmsAr != NULL)
	{
375 376
		int			i = 0;

377 378 379
		for (i = 0; i < count; i++)
		{
			DoConnectParms *pParms = &doConnectParmsAr[i];
380

381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
			if (pParms->connectOptions != NULL)
			{
				pfree(pParms->connectOptions);
				pParms->connectOptions = NULL;
			}

			pfree(pParms->segdbDescPtrArray);
			pParms->segdbDescPtrArray = NULL;
		}

		pfree(doConnectParmsAr);
	}
}

/*
 * Check all the connections of a gang.
 *
 * return the count of successful connections and
 * the count of failed connections due to recovery.
 */
static void
402 403 404 405
checkConnectionStatus(Gang *gp,
					  int *countInRecovery,
					  int *countSuccessful,
					  struct PQExpBufferData *errorMessage)
406
{
407 408 409
	SegmentDatabaseDescriptor *segdbDesc = NULL;
	int			size = gp->size;
	int			i = 0;
410 411

	/*
412 413
	 * In this loop, we check whether the connections were successful. If not,
	 * we recreate the error message with palloc and report it.
414 415 416 417
	 */
	for (i = 0; i < size; i++)
	{
		segdbDesc = &gp->db_descriptors[i];
418

419 420 421 422
		/*
		 * check connection established or not, if not, we may have to
		 * re-build this gang.
		 */
423
		if (segdbDesc->errcode && segdbDesc->error_message.len > 0)
424 425
		{
			/*
426 427
			 * Log failed connections.	Complete failures are taken care of
			 * later.
428 429 430 431 432
			 */
			Assert(segdbDesc->whoami != NULL);
			elog(LOG, "Failed connection to %s", segdbDesc->whoami);

			insist_log(segdbDesc->errcode != 0 && segdbDesc->error_message.len != 0,
433
					   "connection is null, but no error code or error message, for segDB %d", i);
434

435
			ereport(LOG, (errcode(segdbDesc->errcode), errmsg("%s", segdbDesc->error_message.data)));
436 437

			/* this connect failed -- but why ? */
438
			if (segment_failure_due_to_recovery(segdbDesc->error_message.data))
439 440
			{
				elog(LOG, "segment is in recovery mode (%s)", segdbDesc->whoami);
441
				(*countInRecovery)++;
442 443 444
			}
			else
			{
445
				appendPQExpBuffer(errorMessage, "%s (%s)\n", segdbDesc->error_message.data, segdbDesc->whoami);
446 447 448
			}

			cdbconn_resetQEErrorMessage(segdbDesc);
449 450 451 452 453 454 455 456 457 458
		}
		else
		{
			Assert(segdbDesc->errcode == 0 && segdbDesc->error_message.len == 0);

			/* We have a live connection! */
			(*countSuccessful)++;
		}
	}
}