提交 e405f01f 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'fix/fixmsgorder' of https://github.com/taosdata/TDengine into msgOrder

...@@ -75,6 +75,8 @@ typedef struct SQWDebug { ...@@ -75,6 +75,8 @@ typedef struct SQWDebug {
bool lockEnable; bool lockEnable;
bool statusEnable; bool statusEnable;
bool dumpEnable; bool dumpEnable;
bool sleepSimulate;
bool deadSimulate;
bool tmp; bool tmp;
} SQWDebug; } SQWDebug;
......
...@@ -40,6 +40,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComple ...@@ -40,6 +40,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComple
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false}; SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true, .tmp = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) { if (!gQWDebug.statusEnable) {
...@@ -175,29 +175,61 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { ...@@ -175,29 +175,61 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwDbgSimulateSleep() {
if (!gQWDebug.sleepSimulate) {
return;
}
taosSsleep(taosRand() % 10);
}
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) {
if (!gQWDebug.deadSimulate) {
return;
}
//FETCH OR QUERY USE DIFFERENT CONNINFO
qwBuildAndSendErrorRsp(msgType + 1, ctx->dataConnInfo, TSDB_CODE_RPC_BROKEN_LINK);
qwDropTask(QW_FPARAMS());
}
int32_t qwDbgEnableDebug(char *option) { int32_t qwDbgEnableDebug(char *option) {
if (0 == strcasecmp(option, "lock")) { if (0 == strcasecmp(option, "lock")) {
gQWDebug.lockEnable = true; gQWDebug.lockEnable = true;
qDebug("qw lock debug enabled"); qError("qw lock debug enabled");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 == strcasecmp(option, "status")) { if (0 == strcasecmp(option, "status")) {
gQWDebug.statusEnable = true; gQWDebug.statusEnable = true;
qDebug("qw status debug enabled"); qError("qw status debug enabled");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 == strcasecmp(option, "dump")) { if (0 == strcasecmp(option, "dump")) {
gQWDebug.dumpEnable = true; gQWDebug.dumpEnable = true;
qDebug("qw dump debug enabled"); qError("qw dump debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "sleep")) {
gQWDebug.sleepSimulate = true;
qError("qw sleep debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "dead")) {
gQWDebug.sleepSimulate = true;
qError("qw dead debug enabled");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 == strcasecmp(option, "tmp")) { if (0 == strcasecmp(option, "tmp")) {
gQWDebug.tmp = true; gQWDebug.tmp = true;
qDebug("qw tmp debug enabled"); qError("qw tmp debug enabled");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -43,6 +43,20 @@ void qwFreeFetchRsp(void *msg) { ...@@ -43,6 +43,20 @@ void qwFreeFetchRsp(void *msg) {
} }
} }
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = rspType,
.pCont = NULL,
.contLen = 0,
.code = code,
.info = *pConn,
};
tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL; STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL;
int64_t affectedRows = ctx ? ctx->affectedRows : 0; int64_t affectedRows = ctx ? ctx->affectedRows : 0;
......
...@@ -1010,7 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1010,7 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
persistHandle = true; persistHandle = true;
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); //SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
break; break;
} }
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册