diff --git a/include/util/tworker.h b/include/util/tworker.h index 0636f16dbbd50b709de18ce556d8a9dcbdcfba8d..8508adf052bf8eb368dd9846d0a5e1ada1bb6e6e 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -26,12 +26,12 @@ extern "C" { typedef struct SQWorkerPool SQWorkerPool; typedef struct SWWorkerPool SWWorkerPool; -typedef struct SQWorker { +typedef struct SQueueWorker { int32_t id; // worker id int64_t pid; // thread pid TdThread thread; // thread id void *pool; -} SQWorker; +} SQueueWorker; typedef struct SQWorkerPool { int32_t max; // max number of workers @@ -39,7 +39,7 @@ typedef struct SQWorkerPool { int32_t num; // current number of workers STaosQset *qset; const char *name; - SQWorker *workers; + SQueueWorker *workers; TdThreadMutex mutex; } SQWorkerPool; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 47ed2cf035a13314b81f2208718d2c466efb5c68..b01a8717027a06481168759200a89a4adea7fc0a 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -347,7 +347,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { continue; } - if (pRequest->killed) { + if (pRequest->killed || 0 == pRequest->body.queryJob) { releaseRequest(*rid); pIter = taosHashIterate(pObj->pRequests, pIter); continue; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 785ecc2bf502ca5f2a1e1203f1dc5fbe29b60a21..d851578a744f740cf0e211ee6928b2ec3f0dd705 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -58,7 +58,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); -typedef struct SQWorker SQHandle; +typedef struct SQueueWorker SQHandle; typedef struct { const char *name; diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index e8ccb75040cbc54ab080e7887a877ce9ff67006b..86deda52ad242e371ab59542532234778e26864d 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -29,7 +29,7 @@ extern "C" { #endif -typedef struct SQWorker SQHandle; +typedef struct SQueueWorker SQHandle; typedef struct SQnode { int32_t qndId; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index ec925087d09c86fe60204c18457e2ea7b6d6823d..adec0272840870a0498adccd7c67036754752772 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -58,7 +58,7 @@ typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVStatis SVStatis; typedef struct SVBufPool SVBufPool; -typedef struct SQWorker SQHandle; +typedef struct SQueueWorker SQHandle; typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapWriter SMetaSnapWriter; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index f14df8e57c56ac237c4c93528b36a1535569ebbf..283fc7aa10884d408aa3daeaeeab9ff0007f46df 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -76,6 +76,7 @@ typedef struct SQWDebug { bool lockEnable; bool statusEnable; bool dumpEnable; + bool forceStop; bool sleepSimulate; bool deadSimulate; bool redirectSimulate; @@ -248,6 +249,7 @@ typedef struct SQWorkerMgmt { #define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) #define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch) +#define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1) #define QW_SET_QTID(id, qId, tId, eId) \ do { \ diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 7a755cd36f46f87674096bae5b970a137c4e4655..0ab501ddd5abe806f62ef611900df1455b716e9f 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -9,11 +9,13 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, +SQWDebug gQWDebug = {.lockEnable = false, + .statusEnable = true, .dumpEnable = false, .redirectSimulate = false, .deadSimulate = false, - .sleepSimulate = false}; + .sleepSimulate = false, + .forceStop = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -306,6 +308,12 @@ int32_t qwDbgEnableDebug(char *option) { return TSDB_CODE_SUCCESS; } + if (0 == strcasecmp(option, "forceStop")) { + gQWDebug.forceStop = true; + qError("qw forceStop debug enabled"); + return TSDB_CODE_SUCCESS; + } + qError("invalid qw debug option:%s", option); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 2df0109d1912fa63b7c0719ac4579157939304ae..e2db0b8319e1db910c3e1d84e9c5663a772b9c6b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -18,6 +18,51 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; + +int32_t qwStopAllTasks(SQWorker *mgmt) { + uint64_t qId, tId, sId; + int32_t eId; + int64_t rId = 0; + + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_LOCK(QW_WRITE, &ctx->lock); + + sId = ctx->sId; + + QW_TASK_DLOG_E("start to force stop task"); + + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG_E("task already dropping"); + QW_UNLOCK(QW_WRITE, &ctx->lock); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + continue; + } + + if (QW_QUERY_RUNNING(ctx)) { + qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); + QW_TASK_DLOG_E("task running, async killed"); + } else if (QW_FETCH_RUNNING(ctx)) { + QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); + QW_TASK_DLOG_E("task fetching, update drop received"); + } else { + qwDropTask(QW_FPARAMS()); + } + + QW_UNLOCK(QW_WRITE, &ctx->lock); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } + + return TSDB_CODE_SUCCESS; +} + int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; @@ -973,6 +1018,10 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { qwDbgDumpMgmtInfo(mgmt); + if (gQWDebug.forceStop) { + (void)qwStopAllTasks(mgmt); + } + QW_LOCK(QW_READ, &mgmt->schLock); int32_t schNum = taosHashGetSize(mgmt->schHash); @@ -1087,6 +1136,7 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } + int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) { if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) { qError("invalid param to init qworker"); @@ -1185,46 +1235,10 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { SQWorker *mgmt = (SQWorker *)qWorkerMgmt; QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); - - uint64_t qId, tId, sId; - int32_t eId; - int64_t rId = 0; - + atomic_store_8(&mgmt->nodeStopped, 1); - void *pIter = taosHashIterate(mgmt->ctxHash, NULL); - while (pIter) { - SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; - void *key = taosHashGetKey(pIter, NULL); - QW_GET_QTID(key, qId, tId, eId); - - QW_LOCK(QW_WRITE, &ctx->lock); - - sId = ctx->sId; - - QW_TASK_DLOG_E("start to force stop task"); - - if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG_E("task already dropping"); - QW_UNLOCK(QW_WRITE, &ctx->lock); - - pIter = taosHashIterate(mgmt->ctxHash, pIter); - continue; - } - - if (QW_QUERY_RUNNING(ctx)) { - qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); - } else if (QW_FETCH_RUNNING(ctx)) { - QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); - QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); - } else { - qwDropTask(QW_FPARAMS()); - } - - QW_UNLOCK(QW_WRITE, &ctx->lock); - - pIter = taosHashIterate(mgmt->ctxHash, pIter); - } + (void)qwStopAllTasks(mgmt); } void qWorkerDestroy(void **qWorkerMgmt) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 5581931178486ba05f36640edcf7500477cbc96c..631bcb443ebecdc89ab3338d5dd5254964232093 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param); int32_t tQWorkerInit(SQWorkerPool *pool) { pool->qset = taosOpenQset(); - pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker)); + pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -31,7 +31,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { (void)taosThreadMutexInit(&pool->mutex, NULL); for (int32_t i = 0; i < pool->max; ++i) { - SQWorker *worker = pool->workers + i; + SQueueWorker *worker = pool->workers + i; worker->id = i; worker->pool = pool; } @@ -42,14 +42,14 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { void tQWorkerCleanup(SQWorkerPool *pool) { for (int32_t i = 0; i < pool->max; ++i) { - SQWorker *worker = pool->workers + i; + SQueueWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { taosQsetThreadResume(pool->qset); } } for (int32_t i = 0; i < pool->max; ++i) { - SQWorker *worker = pool->workers + i; + SQueueWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { uInfo("worker:%s:%d is stopping", pool->name, worker->id); taosThreadJoin(worker->thread, NULL); @@ -65,7 +65,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) { uInfo("worker:%s is closed", pool->name); } -static void *tQWorkerThreadFp(SQWorker *worker) { +static void *tQWorkerThreadFp(SQueueWorker *worker) { SQWorkerPool *pool = worker->pool; SQueueInfo qinfo = {0}; void *msg = NULL; @@ -106,7 +106,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { // spawn a thread to process queue if (pool->num < pool->max) { do { - SQWorker *worker = pool->workers + pool->num; + SQueueWorker *worker = pool->workers + pool->num; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { pool->qset = taosOpenQset(); - pool->workers = taosArrayInit(2, sizeof(SQWorker *)); + pool->workers = taosArrayInit(2, sizeof(SQueueWorker *)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -153,14 +153,14 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { int32_t size = taosArrayGetSize(pool->workers); for (int32_t i = 0; i < size; ++i) { - SQWorker *worker = taosArrayGetP(pool->workers, i); + SQueueWorker *worker = taosArrayGetP(pool->workers, i); if (taosCheckPthreadValid(worker->thread)) { taosQsetThreadResume(pool->qset); } } for (int32_t i = 0; i < size; ++i) { - SQWorker *worker = taosArrayGetP(pool->workers, i); + SQueueWorker *worker = taosArrayGetP(pool->workers, i); if (taosCheckPthreadValid(worker->thread)) { uInfo("worker:%s:%d is stopping", pool->name, worker->id); taosThreadJoin(worker->thread, NULL); @@ -177,7 +177,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { uInfo("worker:%s is closed", pool->name); } -static void *tAutoQWorkerThreadFp(SQWorker *worker) { +static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { SAutoQWorkerPool *pool = worker->pool; SQueueInfo qinfo = {0}; void *msg = NULL; @@ -222,7 +222,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem // spawn a thread to process queue while (curWorkerNum < dstWorkerNum) { - SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker)); + SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker)); if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) { uError("worker:%s:%d failed to create", pool->name, curWorkerNum); taosMemoryFree(worker);