未验证 提交 0b0af524 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20002 from taosdata/fix/forceStopTask

fix: add force stop task and rename conflict structer name
...@@ -26,12 +26,12 @@ extern "C" { ...@@ -26,12 +26,12 @@ extern "C" {
typedef struct SQWorkerPool SQWorkerPool; typedef struct SQWorkerPool SQWorkerPool;
typedef struct SWWorkerPool SWWorkerPool; typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker { typedef struct SQueueWorker {
int32_t id; // worker id int32_t id; // worker id
int64_t pid; // thread pid int64_t pid; // thread pid
TdThread thread; // thread id TdThread thread; // thread id
void *pool; void *pool;
} SQWorker; } SQueueWorker;
typedef struct SQWorkerPool { typedef struct SQWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
...@@ -39,7 +39,7 @@ typedef struct SQWorkerPool { ...@@ -39,7 +39,7 @@ typedef struct SQWorkerPool {
int32_t num; // current number of workers int32_t num; // current number of workers
STaosQset *qset; STaosQset *qset;
const char *name; const char *name;
SQWorker *workers; SQueueWorker *workers;
TdThreadMutex mutex; TdThreadMutex mutex;
} SQWorkerPool; } SQWorkerPool;
......
...@@ -347,7 +347,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { ...@@ -347,7 +347,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
continue; continue;
} }
if (pRequest->killed) { if (pRequest->killed || 0 == pRequest->body.queryJob) {
releaseRequest(*rid); releaseRequest(*rid);
pIter = taosHashIterate(pObj->pRequests, pIter); pIter = taosHashIterate(pObj->pRequests, pIter);
continue; continue;
......
...@@ -58,7 +58,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode); ...@@ -58,7 +58,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct SQWorker SQHandle; typedef struct SQueueWorker SQHandle;
typedef struct { typedef struct {
const char *name; const char *name;
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SQWorker SQHandle; typedef struct SQueueWorker SQHandle;
typedef struct SQnode { typedef struct SQnode {
int32_t qndId; int32_t qndId;
......
...@@ -58,7 +58,7 @@ typedef struct STQ STQ; ...@@ -58,7 +58,7 @@ typedef struct STQ STQ;
typedef struct SVState SVState; typedef struct SVState SVState;
typedef struct SVStatis SVStatis; typedef struct SVStatis SVStatis;
typedef struct SVBufPool SVBufPool; typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle; typedef struct SQueueWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapReader SMetaSnapReader;
typedef struct SMetaSnapWriter SMetaSnapWriter; typedef struct SMetaSnapWriter SMetaSnapWriter;
......
...@@ -76,6 +76,7 @@ typedef struct SQWDebug { ...@@ -76,6 +76,7 @@ typedef struct SQWDebug {
bool lockEnable; bool lockEnable;
bool statusEnable; bool statusEnable;
bool dumpEnable; bool dumpEnable;
bool forceStop;
bool sleepSimulate; bool sleepSimulate;
bool deadSimulate; bool deadSimulate;
bool redirectSimulate; bool redirectSimulate;
...@@ -248,6 +249,7 @@ typedef struct SQWorkerMgmt { ...@@ -248,6 +249,7 @@ typedef struct SQWorkerMgmt {
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) #define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch) #define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
#define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1)
#define QW_SET_QTID(id, qId, tId, eId) \ #define QW_SET_QTID(id, qId, tId, eId) \
do { \ do { \
......
...@@ -9,11 +9,13 @@ ...@@ -9,11 +9,13 @@
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, SQWDebug gQWDebug = {.lockEnable = false,
.statusEnable = true,
.dumpEnable = false, .dumpEnable = false,
.redirectSimulate = false, .redirectSimulate = false,
.deadSimulate = false, .deadSimulate = false,
.sleepSimulate = false}; .sleepSimulate = false,
.forceStop = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) { if (!gQWDebug.statusEnable) {
...@@ -306,6 +308,12 @@ int32_t qwDbgEnableDebug(char *option) { ...@@ -306,6 +308,12 @@ int32_t qwDbgEnableDebug(char *option) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 == strcasecmp(option, "forceStop")) {
gQWDebug.forceStop = true;
qError("qw forceStop debug enabled");
return TSDB_CODE_SUCCESS;
}
qError("invalid qw debug option:%s", option); qError("invalid qw debug option:%s", option);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
......
...@@ -18,6 +18,51 @@ SQWorkerMgmt gQwMgmt = { ...@@ -18,6 +18,51 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0, .qwNum = 0,
}; };
int32_t qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, tId, sId;
int32_t eId;
int64_t rId = 0;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_LOCK(QW_WRITE, &ctx->lock);
sId = ctx->sId;
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
QW_TASK_DLOG_E("task running, async killed");
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
QW_TASK_DLOG_E("task fetching, update drop received");
} else {
qwDropTask(QW_FPARAMS());
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
...@@ -973,6 +1018,10 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { ...@@ -973,6 +1018,10 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
qwDbgDumpMgmtInfo(mgmt); qwDbgDumpMgmtInfo(mgmt);
if (gQWDebug.forceStop) {
(void)qwStopAllTasks(mgmt);
}
QW_LOCK(QW_READ, &mgmt->schLock); QW_LOCK(QW_READ, &mgmt->schLock);
int32_t schNum = taosHashGetSize(mgmt->schHash); int32_t schNum = taosHashGetSize(mgmt->schHash);
...@@ -1087,6 +1136,7 @@ _return: ...@@ -1087,6 +1136,7 @@ _return:
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) { if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
qError("invalid param to init qworker"); qError("invalid param to init qworker");
...@@ -1185,46 +1235,10 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { ...@@ -1185,46 +1235,10 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
uint64_t qId, tId, sId;
int32_t eId;
int64_t rId = 0;
atomic_store_8(&mgmt->nodeStopped, 1); atomic_store_8(&mgmt->nodeStopped, 1);
void *pIter = taosHashIterate(mgmt->ctxHash, NULL); (void)qwStopAllTasks(mgmt);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_LOCK(QW_WRITE, &ctx->lock);
sId = ctx->sId;
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} else {
qwDropTask(QW_FPARAMS());
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
} }
void qWorkerDestroy(void **qWorkerMgmt) { void qWorkerDestroy(void **qWorkerMgmt) {
......
...@@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param); ...@@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) { int32_t tQWorkerInit(SQWorkerPool *pool) {
pool->qset = taosOpenQset(); pool->qset = taosOpenQset();
pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker)); pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
if (pool->workers == NULL) { if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -31,7 +31,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { ...@@ -31,7 +31,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
(void)taosThreadMutexInit(&pool->mutex, NULL); (void)taosThreadMutexInit(&pool->mutex, NULL);
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i; SQueueWorker *worker = pool->workers + i;
worker->id = i; worker->id = i;
worker->pool = pool; worker->pool = pool;
} }
...@@ -42,14 +42,14 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { ...@@ -42,14 +42,14 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
void tQWorkerCleanup(SQWorkerPool *pool) { void tQWorkerCleanup(SQWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i; SQueueWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset); taosQsetThreadResume(pool->qset);
} }
} }
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i; SQueueWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id); uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL); taosThreadJoin(worker->thread, NULL);
...@@ -65,7 +65,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) { ...@@ -65,7 +65,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name); uInfo("worker:%s is closed", pool->name);
} }
static void *tQWorkerThreadFp(SQWorker *worker) { static void *tQWorkerThreadFp(SQueueWorker *worker) {
SQWorkerPool *pool = worker->pool; SQWorkerPool *pool = worker->pool;
SQueueInfo qinfo = {0}; SQueueInfo qinfo = {0};
void *msg = NULL; void *msg = NULL;
...@@ -106,7 +106,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { ...@@ -106,7 +106,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
// spawn a thread to process queue // spawn a thread to process queue
if (pool->num < pool->max) { if (pool->num < pool->max) {
do { do {
SQWorker *worker = pool->workers + pool->num; SQueueWorker *worker = pool->workers + pool->num;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);
...@@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { ...@@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
pool->qset = taosOpenQset(); pool->qset = taosOpenQset();
pool->workers = taosArrayInit(2, sizeof(SQWorker *)); pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
if (pool->workers == NULL) { if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -153,14 +153,14 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { ...@@ -153,14 +153,14 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
int32_t size = taosArrayGetSize(pool->workers); int32_t size = taosArrayGetSize(pool->workers);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SQWorker *worker = taosArrayGetP(pool->workers, i); SQueueWorker *worker = taosArrayGetP(pool->workers, i);
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset); taosQsetThreadResume(pool->qset);
} }
} }
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SQWorker *worker = taosArrayGetP(pool->workers, i); SQueueWorker *worker = taosArrayGetP(pool->workers, i);
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id); uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL); taosThreadJoin(worker->thread, NULL);
...@@ -177,7 +177,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { ...@@ -177,7 +177,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name); uInfo("worker:%s is closed", pool->name);
} }
static void *tAutoQWorkerThreadFp(SQWorker *worker) { static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
SAutoQWorkerPool *pool = worker->pool; SAutoQWorkerPool *pool = worker->pool;
SQueueInfo qinfo = {0}; SQueueInfo qinfo = {0};
void *msg = NULL; void *msg = NULL;
...@@ -222,7 +222,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem ...@@ -222,7 +222,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
// spawn a thread to process queue // spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) { while (curWorkerNum < dstWorkerNum) {
SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker)); SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) { if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
uError("worker:%s:%d failed to create", pool->name, curWorkerNum); uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
taosMemoryFree(worker); taosMemoryFree(worker);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册