提交 fb9081fc 编写于 作者: X xiong-gang 提交者: GitHub

Correct 'extraSeq' in ack packet after stop is requested

If the ack packet in doSendStopMessageUDPIFC() is lost, QE will keep sending status packet,
 and QD will ack it in handleDataPacket(). We need make sure the 'extraSeq' is equal to 'seq'
 in the ack packet so that QE can update the capacity. Or else, QE will hang for ever.
上级 a12a5725
......@@ -411,6 +411,7 @@ class GpInjectFaultProgram:
"runaway_cleanup (inject fault before starting the cleanup for a runaway query), " \
"opt_task_allocate_string_buffer (inject fault while allocating string buffer), " \
"opt_relcache_translator_catalog_access (inject fault while translating relcache entries), " \
"interconnect_stop_ack_is_lost (inject fault in interconnect to skip sending the stop ack), " \
"send_qe_details_init_backend (inject fault before sending QE details during backend initialization)" \
"all (affects all faults injected, used for 'status' and 'reset'), ")
addTo.add_option("-c", "--ddl_statement", dest="ddlStatement", type="string",
......
......@@ -33,6 +33,7 @@
#include "utils/gp_atomic.h"
#include "utils/builtins.h"
#include "utils/debugbreak.h"
#include "utils/faultinjector.h"
#include "utils/pg_crc.h"
#include "port/pg_crc32c.h"
......@@ -5479,6 +5480,18 @@ doSendStopMessageUDPIFC(ChunkTransportState *transportStates, int16 motNodeID)
* We will skip sending ACKs to those connections.
*/
#ifdef FAULT_INJECTOR
if (FaultInjector_InjectFaultIfSet(
InterconnectStopAckIsLost,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */) == FaultInjectorTypeSkip)
{
pthread_mutex_unlock(&ic_control_info.lock);
continue;
}
#endif
if (conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6)
{
uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
......@@ -5720,7 +5733,9 @@ handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer,
#ifdef AMS_VERBOSE_LOGGING
logPkt("STATUS QUERY MESSAGE", pkt);
#endif
setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
uint32 extraSeq = conn->stopRequested ? seq : conn->conn_info.extraSeq;
setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, seq, extraSeq);
return false;
}
......
......@@ -321,6 +321,8 @@ FaultInjectorIdentifierEnumToString[] = {
/* inject fault in quickdie*/
_("after_one_slice_dispatched"),
/* inject fault in cdbdisp_dispatchX*/
_("interconnect_stop_ack_is_lost"),
/* inject fault in interconnect to skip sending the stop ack */
_("not recognized"),
};
......@@ -1025,6 +1027,7 @@ FaultInjector_NewHashEntry(
case FinishPreparedTransactionAbortPass1AbortingCreateNeeded:
case FinishPreparedTransactionAbortPass2AbortingCreateNeeded:
case InterconnectStopAckIsLost:
case SendQEDetailsInitBackend:
break;
......
......@@ -214,6 +214,8 @@ typedef enum FaultInjectorIdentifier_e {
QuickDie,
AfterOneSliceDispatched,
InterconnectStopAckIsLost,
/* INSERT has to be done before that line */
FaultInjectorIdMax,
......
......@@ -448,3 +448,20 @@ DROP TABLE small_table;
DROP TABLE a;
RESET search_path;
DROP SCHEMA ic_udp_test CASCADE;
/*
* If ack packet is lost in doSendStopMessageUDPIFC(), transaction with cursor
* should still be able to commit.
*/
--start_ignore
drop table if exists ic_test_1;
NOTICE: table "ic_test_1" does not exist, skipping
--end_ignore
create table ic_test_1 as select i as c1, i as c2 from generate_series(1, 100000) i;
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'c1' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
begin;
declare ic_test_cursor_c1 cursor for select * from ic_test_1;
\! gpfaultinjector -q -f interconnect_stop_ack_is_lost -y reset -s 1
\! gpfaultinjector -q -f interconnect_stop_ack_is_lost -y skip -s 1
commit;
drop table ic_test_1;
......@@ -183,3 +183,18 @@ DROP TABLE a;
RESET search_path;
DROP SCHEMA ic_udp_test CASCADE;
/*
* If ack packet is lost in doSendStopMessageUDPIFC(), transaction with cursor
* should still be able to commit.
*/
--start_ignore
drop table if exists ic_test_1;
--end_ignore
create table ic_test_1 as select i as c1, i as c2 from generate_series(1, 100000) i;
begin;
declare ic_test_cursor_c1 cursor for select * from ic_test_1;
\! gpfaultinjector -q -f interconnect_stop_ack_is_lost -y reset -s 1
\! gpfaultinjector -q -f interconnect_stop_ack_is_lost -y skip -s 1
commit;
drop table ic_test_1;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册