From bc56363076d5142fd95c41749aaa5e7c9add2d34 Mon Sep 17 00:00:00 2001 From: Pengzhou Tang Date: Tue, 6 Jun 2017 23:57:49 -0400 Subject: [PATCH] Cleanup dispatch and teardown interconnect once portal failure detected Formerly, GPDB do dispatch/interconnect cleanup on executor level which means once an error occurs within executor, it will be catched and dispatch/interconnect will be cleaned. The problem is if an error occurs after an executor started but before the executor run, dispatch/interconnect has no chance to be cleaned up. A problem is that outbound UDP interconnect packets still think the interconnect is active and will access the memory that has been freed. This commit add a few cleanup points on portal level, a higher call level than executor to cover more cases shown as above. mppExecutorCleanup() is reentrant, so it's ok to do double check on both level. --- src/backend/commands/portalcmds.c | 4 ++++ src/backend/tcop/pquery.c | 12 ++++++++++++ src/test/regress/input/dispatch.source | 19 ++++++++++++++++++ src/test/regress/output/dispatch.source | 26 +++++++++++++++++++++++++ src/test/regress/regress.c | 13 +++++++++++++ 5 files changed, 74 insertions(+) diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 785c8e0b9f..d8436217ff 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -521,6 +521,10 @@ PersistHoldablePortal(Portal portal) /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; + /* GPDB: cleanup dispatch and teardown interconnect */ + if (portal->queryDesc) + mppExecutorCleanup(portal->queryDesc); + /* Restore global vars and propagate error */ ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index deb985911f..c57668ccf9 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -819,6 +819,10 @@ PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot, /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; + /* GPDB: cleanup dispatch and teardown interconnect */ + if (portal->queryDesc) + mppExecutorCleanup(portal->queryDesc); + /* Restore global vars and propagate error */ ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; @@ -1047,6 +1051,10 @@ PortalRun(Portal portal, int64 count, bool isTopLevel, /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; + /* GPDB: cleanup dispatch and teardown interconnect */ + if (portal->queryDesc) + mppExecutorCleanup(portal->queryDesc); + /* Restore global vars and propagate error */ if (saveMemoryContext == saveTopTransactionContext) MemoryContextSwitchTo(TopTransactionContext); @@ -1607,6 +1615,10 @@ PortalRunFetch(Portal portal, /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; + /* GPDB: cleanup dispatch and teardown interconnect */ + if (portal->queryDesc) + mppExecutorCleanup(portal->queryDesc); + /* Restore global vars and propagate error */ ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; diff --git a/src/test/regress/input/dispatch.source b/src/test/regress/input/dispatch.source index 1cffcd0863..de44983bca 100644 --- a/src/test/regress/input/dispatch.source +++ b/src/test/regress/input/dispatch.source @@ -280,3 +280,22 @@ SELECT dtx_dispatch_f(foo.c1) FROM (SELECT c1 FROM dtx_dispatch_t WHERE c1='1' l DROP FUNCTION dtx_dispatch_f(integer); DROP TABLE dtx_dispatch_t; + +-- Test interconnect is shut down under portal failure +CREATE OR REPLACE FUNCTION numActiveMotionConns() RETURNS INT +AS '@abs_builddir@/regress@DLSUFFIX@', 'numActiveMotionConns' LANGUAGE C; + +CREATE TABLE foo_test AS SELECT i AS c1 FROM generate_series(1, 10) i; + +SELECT c1/0 FROM foo_test WHERE c1 = 1; +SELECT numActiveMotionConns(); + +BEGIN; +DECLARE C1 CURSOR FOR SELECT * FROM foo_test; +FETCH BACKWARD 1 FROM C1; +END; + +SELECT numActiveMotionConns(); + +DROP FUNCTION numActiveMotionConns(); +DROP TABLE foo_test; diff --git a/src/test/regress/output/dispatch.source b/src/test/regress/output/dispatch.source index e7c952ff38..2831a37636 100644 --- a/src/test/regress/output/dispatch.source +++ b/src/test/regress/output/dispatch.source @@ -371,3 +371,29 @@ HINT: dispatching DTX commands to a busy gang CONTEXT: PL/pgSQL function "dtx_dispatch_f" line 1 during statement block entry DROP FUNCTION dtx_dispatch_f(integer); DROP TABLE dtx_dispatch_t; +-- Test interconnect is shut down under portal failure +CREATE OR REPLACE FUNCTION numActiveMotionConns() RETURNS INT +AS '@abs_builddir@/regress@DLSUFFIX@', 'numActiveMotionConns' LANGUAGE C; +CREATE TABLE foo_test AS SELECT i AS c1 FROM generate_series(1, 10) i; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause. Creating a NULL policy entry. +SELECT c1/0 FROM foo_test WHERE c1 = 1; +ERROR: division by zero (seg1 slice1 11.111.111.116:25433 pid=166651) +SELECT numActiveMotionConns(); + numactivemotionconns +---------------------- + 0 +(1 row) + +BEGIN; +DECLARE C1 CURSOR FOR SELECT * FROM foo_test; +FETCH BACKWARD 1 FROM C1; +ERROR: backward scan is not supported in this version of Greenplum Database +END; +SELECT numActiveMotionConns(); + numactivemotionconns +---------------------- + 0 +(1 row) + +DROP FUNCTION numActiveMotionConns(); +DROP TABLE foo_test; diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c index 72bd6ba358..137790c514 100644 --- a/src/test/regress/regress.c +++ b/src/test/regress/regress.c @@ -19,6 +19,7 @@ #include "cdb/memquota.h" #include "cdb/cdbgang.h" #include "cdb/cdbvars.h" +#include "cdb/ml_ipc.h" #include "commands/sequence.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -86,6 +87,9 @@ extern Datum cleanupAllGangs(PG_FUNCTION_ARGS); /* check if QD has gangs exist */ extern Datum hasGangsExist(PG_FUNCTION_ARGS); +/* return the number of active motion connections */ +extern Datum numActiveMotionConns(PG_FUNCTION_ARGS); + /* * check if backends exist * Args: @@ -2481,6 +2485,15 @@ hasGangsExist(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } +PG_FUNCTION_INFO_V1(numActiveMotionConns); +Datum numActiveMotionConns(PG_FUNCTION_ARGS) +{ + uint32 num = 0; + if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC) + num = getActiveMotionConns(); + PG_RETURN_UINT32(num); +} + PG_FUNCTION_INFO_V1(hasBackendsExist); Datum hasBackendsExist(PG_FUNCTION_ARGS) -- GitLab