提交 eb40e073 编写于 作者: P Pengzhou Tang

Fix few dispatch related bugs

1.Fix primary writer gang leak: accidentally set PrimaryWriterGang to NULL which cause disconnectAndDestroyAllGangs()
  can not destroy primary writer gang.
2.Fix gang leak: when creating gang, if retry count exceed the limitation, forget to destroy the failed gang.
3.Remove duplicate sanity check before dispatchCommand().
4.Remove unnecessary error-out when a broken Gang is no longer needed.
5.Fix thread leak problem
6.Enhance error handling for cdbdisp_finishCommand
上级 203a4eb5
...@@ -186,7 +186,7 @@ cdbdisp_getDispatchResults(struct CdbDispatcherState *ds, StringInfoData *qeErro ...@@ -186,7 +186,7 @@ cdbdisp_getDispatchResults(struct CdbDispatcherState *ds, StringInfoData *qeErro
void void
cdbdisp_finishCommand(struct CdbDispatcherState *ds) cdbdisp_finishCommand(struct CdbDispatcherState *ds)
{ {
StringInfoData qeErrorMsg; StringInfoData qeErrorMsg = {NULL, 0, 0, 0};
CdbDispatchResults *pr = NULL; CdbDispatchResults *pr = NULL;
/* /*
* If cdbdisp_dispatchToGang() wasn't called, don't wait. * If cdbdisp_dispatchToGang() wasn't called, don't wait.
...@@ -209,28 +209,26 @@ cdbdisp_finishCommand(struct CdbDispatcherState *ds) ...@@ -209,28 +209,26 @@ cdbdisp_finishCommand(struct CdbDispatcherState *ds)
/* /*
* Wait for all QEs to finish. Don't cancel them. * Wait for all QEs to finish. Don't cancel them.
*/ */
initStringInfo(&qeErrorMsg); PG_TRY();
pr = cdbdisp_getDispatchResults(ds, &qeErrorMsg);
if (!pr)
{ {
cdbdisp_destroyDispatcherState(ds); initStringInfo(&qeErrorMsg);
PG_TRY(); pr = cdbdisp_getDispatchResults(ds, &qeErrorMsg);
{
if (!pr)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errOmitLocation(true), errmsg("%s", qeErrorMsg.data))); errOmitLocation(true), errmsg("%s", qeErrorMsg.data)));
}
PG_CATCH(); pfree(qeErrorMsg.data);
{ }
PG_CATCH();
{
if (qeErrorMsg.data != NULL)
pfree(qeErrorMsg.data); pfree(qeErrorMsg.data);
PG_RE_THROW(); cdbdisp_destroyDispatcherState(ds);
} PG_RE_THROW();
PG_END_TRY();
} }
PG_END_TRY();
pfree(qeErrorMsg.data);
/* /*
* If no errors, free the CdbDispatchResults objects and return. * If no errors, free the CdbDispatchResults objects and return.
......
...@@ -173,17 +173,6 @@ cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds, ...@@ -173,17 +173,6 @@ cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds,
} }
pParms->dispatchResultPtrArray[pParms->dispatchCount++] = qeResult; pParms->dispatchResultPtrArray[pParms->dispatchCount++] = qeResult;
if (cdbconn_isBadConnection(segdbDesc))
{
char *msg = PQerrorMessage(qeResult->segdbDesc->conn);
qeResult->stillRunning = false;
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Connection lost before dispatch to %s: %s",
segdbDesc->whoami, msg ? msg : "unknown error")));
}
dispatchCommand(qeResult, pParms->query_text, pParms->query_text_len); dispatchCommand(qeResult, pParms->query_text, pParms->query_text_len);
} }
} }
...@@ -294,6 +283,12 @@ checkDispatchResult(CdbDispatcherState *ds, ...@@ -294,6 +283,12 @@ checkDispatchResult(CdbDispatcherState *ds,
dispatchResult = pParms->dispatchResultPtrArray[i]; dispatchResult = pParms->dispatchResultPtrArray[i];
segdbDesc = dispatchResult->segdbDesc; segdbDesc = dispatchResult->segdbDesc;
/*
* Already finished with this QE?
*/
if (!dispatchResult->stillRunning)
continue;
if (cdbconn_isBadConnection(segdbDesc)) if (cdbconn_isBadConnection(segdbDesc))
{ {
char *msg = PQerrorMessage(segdbDesc->conn); char *msg = PQerrorMessage(segdbDesc->conn);
...@@ -301,14 +296,8 @@ checkDispatchResult(CdbDispatcherState *ds, ...@@ -301,14 +296,8 @@ checkDispatchResult(CdbDispatcherState *ds,
cdbdisp_appendMessageNonThread(dispatchResult, LOG, cdbdisp_appendMessageNonThread(dispatchResult, LOG,
"Connection lost during dispatch to %s: %s", "Connection lost during dispatch to %s: %s",
dispatchResult->segdbDesc->whoami, msg ? msg : "unknown error"); dispatchResult->segdbDesc->whoami, msg ? msg : "unknown error");
}
/*
* Already finished with this QE?
*/
if (!dispatchResult->stillRunning)
continue; continue;
}
/* /*
* Add socket to fd_set if still connected. * Add socket to fd_set if still connected.
......
...@@ -275,7 +275,7 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds, ...@@ -275,7 +275,7 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
*/ */
pParms->waitMode = DISPATCH_WAIT_CANCEL; pParms->waitMode = DISPATCH_WAIT_CANCEL;
for (j = 0; j < threadStartIndex + (i - 1); j++) for (j = 0; j < threadStartIndex + i; j++)
{ {
DispatchCommandParms *pParms; DispatchCommandParms *pParms;
...@@ -631,6 +631,12 @@ thread_DispatchWait(DispatchCommandParms *pParms) ...@@ -631,6 +631,12 @@ thread_DispatchWait(DispatchCommandParms *pParms)
CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i]; CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc; SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
/*
* Already finished with this QE?
*/
if (!dispatchResult->stillRunning)
continue;
if (cdbconn_isBadConnection(segdbDesc)) if (cdbconn_isBadConnection(segdbDesc))
{ {
char *msg = PQerrorMessage(segdbDesc->conn); char *msg = PQerrorMessage(segdbDesc->conn);
...@@ -641,12 +647,6 @@ thread_DispatchWait(DispatchCommandParms *pParms) ...@@ -641,12 +647,6 @@ thread_DispatchWait(DispatchCommandParms *pParms)
continue; continue;
} }
/*
* Already finished with this QE?
*/
if (!dispatchResult->stillRunning)
continue;
/* /*
* Add socket to fd_set if still connected. * Add socket to fd_set if still connected.
*/ */
......
...@@ -1370,10 +1370,7 @@ void freeGangsForPortal(char *portal_name) ...@@ -1370,10 +1370,7 @@ void freeGangsForPortal(char *portal_name)
primaryWriterGang != NULL && primaryWriterGang != NULL &&
!cleanupGang(primaryWriterGang)) !cleanupGang(primaryWriterGang))
{ {
primaryWriterGang = NULL;
disconnectAndDestroyAllGangs(true); disconnectAndDestroyAllGangs(true);
elog(ERROR, "could not temporarily connect to one or more segments");
return; return;
} }
......
...@@ -273,6 +273,10 @@ create_gang_retry: ...@@ -273,6 +273,10 @@ create_gang_retry:
goto create_gang_retry; goto create_gang_retry;
} }
else
{
goto exit;
}
} }
PG_END_TRY(); PG_END_TRY();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册