diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index d3347f4d1fd947003cbb628d994c0c5265e83856..3c6c02ede6e92426479c3c178b551eebccf48289 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -75,6 +75,8 @@ typedef struct SQWDebug { bool lockEnable; bool statusEnable; bool dumpEnable; + bool sleepSimulate; + bool deadSimulate; bool tmp; } SQWDebug; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index acb7004a510cb171562cbe94775f5149fabc7847..16fe6b21c270f89daf4aa1319f1ad50bff4321e5 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -40,6 +40,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComple int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); +int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 893f31acd1feae8a56deb890c1f4371d30c78fbe..cd54c5e5f9ca76ff0f73cacb785e2757b5bd246e 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -9,7 +9,7 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true, .tmp = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -175,29 +175,61 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } +void qwDbgSimulateSleep() { + if (!gQWDebug.sleepSimulate) { + return; + } + + taosSsleep(taosRand() % 10); +} + +void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) { + if (!gQWDebug.deadSimulate) { + return; + } + + //FETCH OR QUERY USE DIFFERENT CONNINFO + qwBuildAndSendErrorRsp(msgType + 1, ctx->dataConnInfo, TSDB_CODE_RPC_BROKEN_LINK); + + qwDropTask(QW_FPARAMS()); +} + + int32_t qwDbgEnableDebug(char *option) { if (0 == strcasecmp(option, "lock")) { gQWDebug.lockEnable = true; - qDebug("qw lock debug enabled"); + qError("qw lock debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "status")) { gQWDebug.statusEnable = true; - qDebug("qw status debug enabled"); + qError("qw status debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "dump")) { gQWDebug.dumpEnable = true; - qDebug("qw dump debug enabled"); + qError("qw dump debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "sleep")) { + gQWDebug.sleepSimulate = true; + qError("qw sleep debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "dead")) { + gQWDebug.sleepSimulate = true; + qError("qw dead debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "tmp")) { gQWDebug.tmp = true; - qDebug("qw tmp debug enabled"); + qError("qw tmp debug enabled"); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 147ce76fc27eaa15e31e9ce2d928cf1d78c025f0..8bbd03f7352050b913064a41c9f34e7d70632700 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -43,6 +43,20 @@ void qwFreeFetchRsp(void *msg) { } } +int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) { + SRpcMsg rpcRsp = { + .msgType = rspType, + .pCont = NULL, + .contLen = 0, + .code = code, + .info = *pConn, + }; + + tmsgSendRsp(&rpcRsp); + + return TSDB_CODE_SUCCESS; +} + int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL; int64_t affectedRows = ctx ? ctx->affectedRows : 0; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 41d9f46a87acb70a00c64ebcd2fd4cfcf29ff6e5..ec07ee85fdc8ba7c3a988bdf0a825bc8e8921710 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1010,7 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); persistHandle = true; - SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); + //SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); break; } case TDMT_SCH_FETCH: