#include "dataSinkMgt.h" #include "executor.h" #include "planner.h" #include "query.h" #include "qwInt.h" #include "qwMsg.h" #include "qworker.h" #include "tcommon.h" #include "tmsg.h" #include "tname.h" SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .redirectSimulate = false, .deadSimulate = false, .sleepSimulate = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { return TSDB_CODE_SUCCESS; } int32_t code = 0; if (oriStatus == newStatus) { if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) { *ignore = true; return TSDB_CODE_SUCCESS; } QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } switch (oriStatus) { case JOB_TASK_STATUS_NULL: if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_INIT) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_INIT: if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_EXEC: if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_SUCC && newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_PART_SUCC: if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC && newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_SUCC: if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_FAIL) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_FAIL: if (newStatus != JOB_TASK_STATUS_DROP) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_DROP: if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_PART_SUCC) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; default: QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus)); return TSDB_CODE_QRY_APP_ERROR; } return TSDB_CODE_SUCCESS; _return: QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus)); QW_RET(code); } void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) { QW_LOCK(QW_READ, &sch->tasksLock); int32_t taskNum = taosHashGetSize(sch->tasksHash); QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum); uint64_t qId, tId; int32_t eId; SQWTaskStatus *pTask = NULL; void *pIter = taosHashIterate(sch->tasksHash, NULL); while (pIter) { pTask = (SQWTaskStatus *)pIter; void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, tId, eId); QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status); pIter = taosHashIterate(sch->tasksHash, pIter); } QW_UNLOCK(QW_READ, &sch->tasksLock); } void qwDbgDumpTasksInfo(SQWorker *mgmt) { QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash)); int32_t i = 0; SQWTaskCtx *ctx = NULL; uint64_t qId, tId; int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, tId, eId); QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, " "execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d", ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType, ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName, ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]); pIter = taosHashIterate(mgmt->ctxHash, pIter); } } void qwDbgDumpMgmtInfo(SQWorker *mgmt) { if (!gQWDebug.dumpEnable) { return; } QW_LOCK(QW_READ, &mgmt->schLock); QW_DUMP("total remain scheduler num %d", taosHashGetSize(mgmt->schHash)); void *key = NULL; size_t keyLen = 0; int32_t i = 0; SQWSchStatus *sch = NULL; void *pIter = taosHashIterate(mgmt->schHash, NULL); while (pIter) { sch = (SQWSchStatus *)pIter; qwDbgDumpSchInfo(mgmt, sch, i); ++i; pIter = taosHashIterate(mgmt->schHash, pIter); } QW_UNLOCK(QW_READ, &mgmt->schLock); qwDbgDumpTasksInfo(mgmt); } int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) { int32_t contLen = 0; char *rsp = NULL; if (pEpSet) { contLen = tSerializeSEpSet(NULL, 0, pEpSet); rsp = rpcMallocCont(contLen); tSerializeSEpSet(rsp, contLen, pEpSet); } SRpcMsg rpcRsp = { .msgType = rspType, .pCont = rsp, .contLen = contLen, .code = code, .info = *pConn, }; rpcRsp.info.hasEpSet = 1; tmsgSendRsp(&rpcRsp); qDebug("response %s msg, code: %s", TMSG_INFO(rspType), tstrerror(code)); return TSDB_CODE_SUCCESS; } void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { static int32_t ignoreTime = 0; if (*rsped) { return; } if (gQWDebug.redirectSimulate) { if (++ignoreTime <= 10) { return; } if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { SEpSet epSet = {0}; epSet.inUse = 1; epSet.numOfEps = 3; strcpy(epSet.eps[0].fqdn, "localhost"); epSet.eps[0].port = 7100; strcpy(epSet.eps[1].fqdn, "localhost"); epSet.eps[1].port = 7200; strcpy(epSet.eps[2].fqdn, "localhost"); epSet.eps[2].port = 7300; ctx->phase = QW_PHASE_POST_QUERY; qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); *rsped = true; return; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); *rsped = true; return; } if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) { qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); *rsped = true; return; } } } void qwDbgSimulateSleep(void) { if (!gQWDebug.sleepSimulate) { return; } static int32_t ignoreTime = 0; if (++ignoreTime > 10) { taosSsleep(taosRand() % 20); } } void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { if (!gQWDebug.deadSimulate) { return; } if (*rsped) { return; } static int32_t ignoreTime = 0; if (++ignoreTime > 10 && 0 == taosRand() % 9) { SRpcHandleInfo *pConn = ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo); qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); qwBuildAndSendDropMsg(QW_FPARAMS(), pConn); *rsped = true; return; } } int32_t qwDbgEnableDebug(char *option) { if (0 == strcasecmp(option, "lock")) { gQWDebug.lockEnable = true; qError("qw lock debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "status")) { gQWDebug.statusEnable = true; qError("qw status debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "dump")) { gQWDebug.dumpEnable = true; qError("qw dump debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "sleep")) { gQWDebug.sleepSimulate = true; qError("qw sleep debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "dead")) { gQWDebug.deadSimulate = true; qError("qw dead debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "redirect")) { gQWDebug.redirectSimulate = true; qError("qw redirect debug enabled"); return TSDB_CODE_SUCCESS; } qError("invalid qw debug option:%s", option); return TSDB_CODE_APP_ERROR; }