From fb9081fc29bb38e5f0f4bb4dd9fb5a93cf19352a Mon Sep 17 00:00:00 2001 From: xiong-gang Date: Thu, 8 Dec 2016 14:26:05 +0800 Subject: [PATCH] Correct 'extraSeq' in ack packet after stop is requested If the ack packet in doSendStopMessageUDPIFC() is lost, QE will keep sending status packet, and QD will ack it in handleDataPacket(). We need make sure the 'extraSeq' is equal to 'seq' in the ack packet so that QE can update the capacity. Or else, QE will hang for ever. --- gpMgmt/bin/gppylib/programs/clsInjectFault.py | 1 + src/backend/cdb/motion/ic_udpifc.c | 17 ++++++++++++++++- src/backend/utils/misc/faultinjector.c | 3 +++ src/include/utils/faultinjector.h | 2 ++ src/test/regress/expected/ic.out | 17 +++++++++++++++++ src/test/regress/sql/ic.sql | 15 +++++++++++++++ 6 files changed, 54 insertions(+), 1 deletion(-) diff --git a/gpMgmt/bin/gppylib/programs/clsInjectFault.py b/gpMgmt/bin/gppylib/programs/clsInjectFault.py index 5c1a432964..30c99196ea 100644 --- a/gpMgmt/bin/gppylib/programs/clsInjectFault.py +++ b/gpMgmt/bin/gppylib/programs/clsInjectFault.py @@ -411,6 +411,7 @@ class GpInjectFaultProgram: "runaway_cleanup (inject fault before starting the cleanup for a runaway query), " \ "opt_task_allocate_string_buffer (inject fault while allocating string buffer), " \ "opt_relcache_translator_catalog_access (inject fault while translating relcache entries), " \ + "interconnect_stop_ack_is_lost (inject fault in interconnect to skip sending the stop ack), " \ "send_qe_details_init_backend (inject fault before sending QE details during backend initialization)" \ "all (affects all faults injected, used for 'status' and 'reset'), ") addTo.add_option("-c", "--ddl_statement", dest="ddlStatement", type="string", diff --git a/src/backend/cdb/motion/ic_udpifc.c b/src/backend/cdb/motion/ic_udpifc.c index 77981dfd4e..1f18a65245 100644 --- a/src/backend/cdb/motion/ic_udpifc.c +++ b/src/backend/cdb/motion/ic_udpifc.c @@ -33,6 +33,7 @@ #include "utils/gp_atomic.h" #include "utils/builtins.h" #include "utils/debugbreak.h" +#include "utils/faultinjector.h" #include "utils/pg_crc.h" #include "port/pg_crc32c.h" @@ -5479,6 +5480,18 @@ doSendStopMessageUDPIFC(ChunkTransportState *transportStates, int16 motNodeID) * We will skip sending ACKs to those connections. */ +#ifdef FAULT_INJECTOR + if (FaultInjector_InjectFaultIfSet( + InterconnectStopAckIsLost, + DDLNotSpecified, + "" /* databaseName */, + "" /* tableName */) == FaultInjectorTypeSkip) + { + pthread_mutex_unlock(&ic_control_info.lock); + continue; + } +#endif + if (conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6) { uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0; @@ -5720,7 +5733,9 @@ handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, #ifdef AMS_VERBOSE_LOGGING logPkt("STATUS QUERY MESSAGE", pkt); #endif - setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq); + uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0; + uint32 extraSeq = conn->stopRequested ? seq : conn->conn_info.extraSeq; + setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, seq, extraSeq); return false; } diff --git a/src/backend/utils/misc/faultinjector.c b/src/backend/utils/misc/faultinjector.c index 46559f3e82..93a4182dce 100644 --- a/src/backend/utils/misc/faultinjector.c +++ b/src/backend/utils/misc/faultinjector.c @@ -321,6 +321,8 @@ FaultInjectorIdentifierEnumToString[] = { /* inject fault in quickdie*/ _("after_one_slice_dispatched"), /* inject fault in cdbdisp_dispatchX*/ + _("interconnect_stop_ack_is_lost"), + /* inject fault in interconnect to skip sending the stop ack */ _("not recognized"), }; @@ -1025,6 +1027,7 @@ FaultInjector_NewHashEntry( case FinishPreparedTransactionAbortPass1AbortingCreateNeeded: case FinishPreparedTransactionAbortPass2AbortingCreateNeeded: + case InterconnectStopAckIsLost: case SendQEDetailsInitBackend: break; diff --git a/src/include/utils/faultinjector.h b/src/include/utils/faultinjector.h index 0ccad957fc..47f2f15eb6 100644 --- a/src/include/utils/faultinjector.h +++ b/src/include/utils/faultinjector.h @@ -214,6 +214,8 @@ typedef enum FaultInjectorIdentifier_e { QuickDie, AfterOneSliceDispatched, + InterconnectStopAckIsLost, + /* INSERT has to be done before that line */ FaultInjectorIdMax, diff --git a/src/test/regress/expected/ic.out b/src/test/regress/expected/ic.out index 89aa7aea13..48043f5154 100644 --- a/src/test/regress/expected/ic.out +++ b/src/test/regress/expected/ic.out @@ -448,3 +448,20 @@ DROP TABLE small_table; DROP TABLE a; RESET search_path; DROP SCHEMA ic_udp_test CASCADE; +/* + * If ack packet is lost in doSendStopMessageUDPIFC(), transaction with cursor + * should still be able to commit. +*/ +--start_ignore +drop table if exists ic_test_1; +NOTICE: table "ic_test_1" does not exist, skipping +--end_ignore +create table ic_test_1 as select i as c1, i as c2 from generate_series(1, 100000) i; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'c1' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +begin; +declare ic_test_cursor_c1 cursor for select * from ic_test_1; +\! gpfaultinjector -q -f interconnect_stop_ack_is_lost -y reset -s 1 +\! gpfaultinjector -q -f interconnect_stop_ack_is_lost -y skip -s 1 +commit; +drop table ic_test_1; diff --git a/src/test/regress/sql/ic.sql b/src/test/regress/sql/ic.sql index f91b0362df..4bda551cc0 100644 --- a/src/test/regress/sql/ic.sql +++ b/src/test/regress/sql/ic.sql @@ -183,3 +183,18 @@ DROP TABLE a; RESET search_path; DROP SCHEMA ic_udp_test CASCADE; + +/* + * If ack packet is lost in doSendStopMessageUDPIFC(), transaction with cursor + * should still be able to commit. +*/ +--start_ignore +drop table if exists ic_test_1; +--end_ignore +create table ic_test_1 as select i as c1, i as c2 from generate_series(1, 100000) i; +begin; +declare ic_test_cursor_c1 cursor for select * from ic_test_1; +\! gpfaultinjector -q -f interconnect_stop_ack_is_lost -y reset -s 1 +\! gpfaultinjector -q -f interconnect_stop_ack_is_lost -y skip -s 1 +commit; +drop table ic_test_1; -- GitLab