diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 785c8e0b9f62d48f50ab9af51f8759cbc289c0e3..d8436217ff426f3de446bdcbd2041ce88a610e87 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -521,6 +521,10 @@ PersistHoldablePortal(Portal portal) /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; + /* GPDB: cleanup dispatch and teardown interconnect */ + if (portal->queryDesc) + mppExecutorCleanup(portal->queryDesc); + /* Restore global vars and propagate error */ ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index deb985911fd82a8d7429f0a7e15b6d8962d57d6a..c57668ccf99b26c6dcf8cc94539d488b87308ce8 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -819,6 +819,10 @@ PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot, /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; + /* GPDB: cleanup dispatch and teardown interconnect */ + if (portal->queryDesc) + mppExecutorCleanup(portal->queryDesc); + /* Restore global vars and propagate error */ ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; @@ -1047,6 +1051,10 @@ PortalRun(Portal portal, int64 count, bool isTopLevel, /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; + /* GPDB: cleanup dispatch and teardown interconnect */ + if (portal->queryDesc) + mppExecutorCleanup(portal->queryDesc); + /* Restore global vars and propagate error */ if (saveMemoryContext == saveTopTransactionContext) MemoryContextSwitchTo(TopTransactionContext); @@ -1607,6 +1615,10 @@ PortalRunFetch(Portal portal, /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; + /* GPDB: cleanup dispatch and teardown interconnect */ + if (portal->queryDesc) + mppExecutorCleanup(portal->queryDesc); + /* Restore global vars and propagate error */ ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; diff --git a/src/test/regress/input/dispatch.source b/src/test/regress/input/dispatch.source index 1cffcd08639a8f59bb5d82b442faf88e59adca97..de44983bca56fb75f92e7e9d0da8212d7d27dd70 100644 --- a/src/test/regress/input/dispatch.source +++ b/src/test/regress/input/dispatch.source @@ -280,3 +280,22 @@ SELECT dtx_dispatch_f(foo.c1) FROM (SELECT c1 FROM dtx_dispatch_t WHERE c1='1' l DROP FUNCTION dtx_dispatch_f(integer); DROP TABLE dtx_dispatch_t; + +-- Test interconnect is shut down under portal failure +CREATE OR REPLACE FUNCTION numActiveMotionConns() RETURNS INT +AS '@abs_builddir@/regress@DLSUFFIX@', 'numActiveMotionConns' LANGUAGE C; + +CREATE TABLE foo_test AS SELECT i AS c1 FROM generate_series(1, 10) i; + +SELECT c1/0 FROM foo_test WHERE c1 = 1; +SELECT numActiveMotionConns(); + +BEGIN; +DECLARE C1 CURSOR FOR SELECT * FROM foo_test; +FETCH BACKWARD 1 FROM C1; +END; + +SELECT numActiveMotionConns(); + +DROP FUNCTION numActiveMotionConns(); +DROP TABLE foo_test; diff --git a/src/test/regress/output/dispatch.source b/src/test/regress/output/dispatch.source index e7c952ff38be87b552987921eecede4f25d9f8a7..2831a37636ffbc36776fec5483f91cc233f84e41 100644 --- a/src/test/regress/output/dispatch.source +++ b/src/test/regress/output/dispatch.source @@ -371,3 +371,29 @@ HINT: dispatching DTX commands to a busy gang CONTEXT: PL/pgSQL function "dtx_dispatch_f" line 1 during statement block entry DROP FUNCTION dtx_dispatch_f(integer); DROP TABLE dtx_dispatch_t; +-- Test interconnect is shut down under portal failure +CREATE OR REPLACE FUNCTION numActiveMotionConns() RETURNS INT +AS '@abs_builddir@/regress@DLSUFFIX@', 'numActiveMotionConns' LANGUAGE C; +CREATE TABLE foo_test AS SELECT i AS c1 FROM generate_series(1, 10) i; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause. Creating a NULL policy entry. +SELECT c1/0 FROM foo_test WHERE c1 = 1; +ERROR: division by zero (seg1 slice1 11.111.111.116:25433 pid=166651) +SELECT numActiveMotionConns(); + numactivemotionconns +---------------------- + 0 +(1 row) + +BEGIN; +DECLARE C1 CURSOR FOR SELECT * FROM foo_test; +FETCH BACKWARD 1 FROM C1; +ERROR: backward scan is not supported in this version of Greenplum Database +END; +SELECT numActiveMotionConns(); + numactivemotionconns +---------------------- + 0 +(1 row) + +DROP FUNCTION numActiveMotionConns(); +DROP TABLE foo_test; diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c index 72bd6ba358f1029e1c55de1ef595b8e7b840f9f5..137790c514045987ca914f7bdb199a8579b9560b 100644 --- a/src/test/regress/regress.c +++ b/src/test/regress/regress.c @@ -19,6 +19,7 @@ #include "cdb/memquota.h" #include "cdb/cdbgang.h" #include "cdb/cdbvars.h" +#include "cdb/ml_ipc.h" #include "commands/sequence.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -86,6 +87,9 @@ extern Datum cleanupAllGangs(PG_FUNCTION_ARGS); /* check if QD has gangs exist */ extern Datum hasGangsExist(PG_FUNCTION_ARGS); +/* return the number of active motion connections */ +extern Datum numActiveMotionConns(PG_FUNCTION_ARGS); + /* * check if backends exist * Args: @@ -2481,6 +2485,15 @@ hasGangsExist(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } +PG_FUNCTION_INFO_V1(numActiveMotionConns); +Datum numActiveMotionConns(PG_FUNCTION_ARGS) +{ + uint32 num = 0; + if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC) + num = getActiveMotionConns(); + PG_RETURN_UINT32(num); +} + PG_FUNCTION_INFO_V1(hasBackendsExist); Datum hasBackendsExist(PG_FUNCTION_ARGS)