From eaae4a5b6583b92d293838e99ec83cb51ec70426 Mon Sep 17 00:00:00 2001 From: Pengzhou Tang Date: Wed, 8 Nov 2017 22:14:04 -0500 Subject: [PATCH] Bring back c3d8a92e which was reverted by accident --- src/backend/cdb/dispatcher/cdbdisp_async.c | 20 ++++++++++++++++- src/test/regress/input/dispatch.source | 10 +++++++++ src/test/regress/output/dispatch.source | 26 ++++++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/backend/cdb/dispatcher/cdbdisp_async.c b/src/backend/cdb/dispatcher/cdbdisp_async.c index 2a191bcb05..6e031685a5 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_async.c +++ b/src/backend/cdb/dispatcher/cdbdisp_async.c @@ -418,6 +418,7 @@ checkDispatchResult(CdbDispatcherState *ds, int sock; int n; int nfds = 0; + PGconn *conn; /* * bail-out if we are dying. Once QD dies, QE will recognize it @@ -441,6 +442,7 @@ checkDispatchResult(CdbDispatcherState *ds, { dispatchResult = pParms->dispatchResultPtrArray[i]; segdbDesc = dispatchResult->segdbDesc; + conn = segdbDesc->conn; /* * Already finished with this QE? @@ -450,10 +452,26 @@ checkDispatchResult(CdbDispatcherState *ds, Assert(!cdbconn_isBadConnection(segdbDesc)); + /* + * Flush out buffer in case some commands are not fully + * dispatched to QEs, this can prevent QD from polling + * on such QEs forever. + */ + if (conn->outCount > 0) + { + /* + * Don't error out here, let following poll() routine to + * handle it. + */ + if (pqFlush(conn) < 0) + elog(LOG, "Failed flushing outbound data to %s:%s", + segdbDesc->whoami, PQerrorMessage(conn)); + } + /* * Add socket to fd_set if still connected. */ - sock = PQsocket(segdbDesc->conn); + sock = PQsocket(conn); Assert(sock >= 0); fds[nfds].fd = sock; fds[nfds].events = POLLIN; diff --git a/src/test/regress/input/dispatch.source b/src/test/regress/input/dispatch.source index 665b9d34fd..c6656384ca 100644 --- a/src/test/regress/input/dispatch.source +++ b/src/test/regress/input/dispatch.source @@ -308,3 +308,13 @@ select 1 from gp_dist_random('gp_id') limit 1; -- if previous gang is not destroyed, snapshot collision would happen select 1 from gp_dist_random('gp_id') limit 1; select gp_inject_fault('gang_created', 'reset', 1); + +-- +-- Test that an error happens after a big command is dispatched. +-- +select gp_inject_fault('after_one_slice_dispatched', 'error', 1); +select * from gp_dist_random('gp_id') + where gpname > (select * from repeat('sssss', 10000000)); +select gp_inject_fault('after_one_slice_dispatched', 'reset', 1); +select * from gp_dist_random('gp_id') + where gpname > (select * from repeat('sssss', 10000000)); diff --git a/src/test/regress/output/dispatch.source b/src/test/regress/output/dispatch.source index 7a3f2679be..15d9d4a40f 100644 --- a/src/test/regress/output/dispatch.source +++ b/src/test/regress/output/dispatch.source @@ -554,3 +554,29 @@ NOTICE: Success: t (1 row) +-- +-- Test that an error happens after a big command is dispatched. +-- +select gp_inject_fault('after_one_slice_dispatched', 'error', 1); +NOTICE: Success: + gp_inject_fault +----------------- + t +(1 row) + +select * from gp_dist_random('gp_id') + where gpname > (select * from repeat('sssss', 10000000)); +ERROR: fault triggered, fault name:'after_one_slice_dispatched' fault type:'error' +select gp_inject_fault('after_one_slice_dispatched', 'reset', 1); +NOTICE: Success: + gp_inject_fault +----------------- + t +(1 row) + +select * from gp_dist_random('gp_id') + where gpname > (select * from repeat('sssss', 10000000)); + gpname | numsegments | dbid | content +--------+-------------+------+--------- +(0 rows) + -- GitLab