提交 c9b81efc 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/main' into fix/TD-21566

......@@ -733,7 +733,7 @@ To prevent system resource from being exhausted by multiple concurrent streams,
| 42 | numOfCommitThreads | Yes | Yes |
| 43 | numOfMnodeReadThreads | No | Yes |
| 44 | numOfVnodeQueryThreads | No | Yes |
| 45 | numOfVnodeStreamThreads | No | Yes |
| 45 | ratioOfVnodeStreamThreads | No | Yes |
| 46 | numOfVnodeFetchThreads | No | Yes |
| 47 | numOfVnodeRsmaThreads | No | Yes |
| 48 | numOfQnodeQueryThreads | No | Yes |
......
......@@ -709,7 +709,7 @@ charset 的有效值是 UTF-8。
| 42 | numOfCommitThreads | 是 | 是 | |
| 43 | numOfMnodeReadThreads | 否 | 是 | |
| 44 | numOfVnodeQueryThreads | 否 | 是 | |
| 45 | numOfVnodeStreamThreads | 否 | 是 | |
| 45 | ratioOfVnodeStreamThreads | 否 | 是 | |
| 46 | numOfVnodeFetchThreads | 否 | 是 | |
| 47 | numOfVnodeRsmaThreads | 否 | 是 | |
| 48 | numOfQnodeQueryThreads | 否 | 是 | |
......
......@@ -55,7 +55,7 @@ extern int32_t tsNumOfMnodeQueryThreads;
extern int32_t tsNumOfMnodeFetchThreads;
extern int32_t tsNumOfMnodeReadThreads;
extern int32_t tsNumOfVnodeQueryThreads;
extern int32_t tsNumOfVnodeStreamThreads;
extern float tsRatioOfVnodeStreamThreads;
extern int32_t tsNumOfVnodeFetchThreads;
extern int32_t tsNumOfVnodeRsmaThreads;
extern int32_t tsNumOfQnodeQueryThreads;
......
......@@ -17,6 +17,7 @@
#define _TD_UTIL_WORKER_H_
#include "tqueue.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
......@@ -26,10 +27,10 @@ typedef struct SQWorkerPool SQWorkerPool;
typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker {
int32_t id; // worker id
int64_t pid; // thread pid
TdThread thread; // thread id
SQWorkerPool *pool;
int32_t id; // worker id
int64_t pid; // thread pid
TdThread thread; // thread id
void *pool;
} SQWorker;
typedef struct SQWorkerPool {
......@@ -42,6 +43,14 @@ typedef struct SQWorkerPool {
TdThreadMutex mutex;
} SQWorkerPool;
typedef struct SAutoQWorkerPool {
float ratio;
STaosQset *qset;
const char *name;
SArray *workers;
TdThreadMutex mutex;
} SAutoQWorkerPool;
typedef struct SWWorker {
int32_t id; // worker id
int64_t pid; // thread pid
......@@ -65,6 +74,11 @@ void tQWorkerCleanup(SQWorkerPool *pool);
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool);
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool);
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp);
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue);
int32_t tWWorkerInit(SWWorkerPool *pool);
void tWWorkerCleanup(SWWorkerPool *pool);
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
......
......@@ -47,7 +47,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4;
int32_t tsNumOfVnodeStreamThreads = 2;
float tsRatioOfVnodeStreamThreads = 1.0;
int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4;
......@@ -392,9 +392,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, 0) != 0) return -1;
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
......@@ -513,11 +511,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfVnodeStreamThreads");
pItem = cfgGetItem(tsCfg, "ratioOfVnodeStreamThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeStreamThreads = numOfCores / 4;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
pItem->i32 = tsNumOfVnodeStreamThreads;
pItem->fval = tsRatioOfVnodeStreamThreads;
pItem->stype = stype;
}
......@@ -710,7 +706,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32;
tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval;
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
......
......@@ -26,20 +26,20 @@ extern "C" {
#endif
typedef struct SVnodeMgmt {
SDnodeData *pData;
SMsgCb msgCb;
const char *path;
const char *name;
SQWorkerPool queryPool;
SQWorkerPool streamPool;
SWWorkerPool fetchPool;
SSingleWorker mgmtWorker;
SHashObj *hash;
TdThreadRwlock lock;
SVnodesStat state;
STfs *pTfs;
TdThread thread;
bool stop;
SDnodeData *pData;
SMsgCb msgCb;
const char *path;
const char *name;
SQWorkerPool queryPool;
SAutoQWorkerPool streamPool;
SWWorkerPool fetchPool;
SSingleWorker mgmtWorker;
SHashObj *hash;
TdThreadRwlock lock;
SVnodesStat state;
STfs *pTfs;
TdThread thread;
bool stop;
} SVnodeMgmt;
typedef struct {
......
......@@ -318,7 +318,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
......@@ -344,7 +344,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pQueryQ = NULL;
pVnode->pStreamQ = NULL;
......@@ -359,11 +359,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pQPool->max = tsNumOfVnodeQueryThreads;
if (tQWorkerInit(pQPool) != 0) return -1;
SQWorkerPool *pStreamPool = &pMgmt->streamPool;
SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool;
pStreamPool->name = "vnode-stream";
pStreamPool->min = tsNumOfVnodeStreamThreads;
pStreamPool->max = tsNumOfVnodeStreamThreads;
if (tQWorkerInit(pStreamPool) != 0) return -1;
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
if (tAutoQWorkerInit(pStreamPool) != 0) return -1;
SWWorkerPool *pFPool = &pMgmt->fetchPool;
pFPool->name = "vnode-fetch";
......@@ -385,7 +384,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
void vmStopWorker(SVnodeMgmt *pMgmt) {
tQWorkerCleanup(&pMgmt->queryPool);
tQWorkerCleanup(&pMgmt->streamPool);
tAutoQWorkerCleanup(&pMgmt->streamPool);
tWWorkerCleanup(&pMgmt->fetchPool);
dDebug("vnode workers are closed");
}
......@@ -111,12 +111,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN;
goto _OVER;
} /* else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
(!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
terrno = pRpc->code;
goto _OVER;
}*/
} else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
(!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
terrno = pRpc->code;
goto _OVER;
}
if (pHandle->defaultNtype == NODE_END) {
dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
......@@ -248,9 +248,9 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;
......
......@@ -5640,7 +5640,8 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCrea
return code;
}
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect)) {
......@@ -5650,6 +5651,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"SUBTABLE expression must be of VARCHAR type");
}
if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"The trigger mode of non window query can only be AT_ONCE");
}
return TSDB_CODE_SUCCESS;
}
......@@ -5663,7 +5668,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
code = translateQuery(pCxt, pStmt->pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt->pQuery);
code = checkStreamQuery(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
......
......@@ -487,8 +487,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
taosMemoryFree(pReqs);
}
return code;
} else {
ASSERT(0);
}
return 0;
}
......@@ -514,7 +512,6 @@ int32_t streamDispatch(SStreamTask* pTask) {
int32_t code = 0;
if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
ASSERT(0);
code = -1;
streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
......
......@@ -143,6 +143,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
ASSERT(left >= 0);
if (left == 0) {
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
streamTaskLaunchRecover(pTask, version);
}
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
......
......@@ -175,6 +175,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
}
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
}
if (pTask->pState) streamStateClose(pTask->pState);
......
......@@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
worker->pool = pool;
}
uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
return 0;
}
......@@ -51,8 +51,10 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
}
}
......@@ -60,7 +62,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
taosCloseQset(pool->qset);
taosThreadMutexDestroy(&pool->mutex);
uDebug("worker:%s is closed", pool->name);
uInfo("worker:%s is closed", pool->name);
}
static void *tQWorkerThreadFp(SQWorker *worker) {
......@@ -119,7 +121,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
taosThreadAttrDestroy(&thAttr);
pool->num++;
uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
} while (pool->num < pool->min);
}
......@@ -130,7 +132,134 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
}
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
taosCloseQueue(queue);
}
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
pool->qset = taosOpenQset();
pool->workers = taosArrayInit(2, sizeof(SQWorker *));
if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
(void)taosThreadMutexInit(&pool->mutex, NULL);
uInfo("worker:%s is initialized as auto", pool->name);
return 0;
}
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
int32_t size = taosArrayGetSize(pool->workers);
for (int32_t i = 0; i < size; ++i) {
SQWorker *worker = taosArrayGetP(pool->workers, i);
if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset);
}
}
for (int32_t i = 0; i < size; ++i) {
SQWorker *worker = taosArrayGetP(pool->workers, i);
if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
}
taosMemoryFree(worker);
}
taosArrayDestroy(pool->workers);
taosCloseQset(pool->qset);
taosThreadMutexDestroy(&pool->mutex);
uInfo("worker:%s is closed", pool->name);
}
static void *tAutoQWorkerThreadFp(SQWorker *worker) {
SAutoQWorkerPool *pool = worker->pool;
SQueueInfo qinfo = {0};
void *msg = NULL;
int32_t code = 0;
taosBlockSIGPIPE();
setThreadName(pool->name);
worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
worker->pid);
break;
}
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = taosArrayGetSize(pool->workers);
(*((FItem)qinfo.fp))(&qinfo, msg);
}
taosUpdateItemSize(qinfo.queue, 1);
}
return NULL;
}
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
STaosQueue *queue = taosOpenQueue();
if (queue == NULL) return NULL;
taosThreadMutexLock(&pool->mutex);
taosSetQueueFp(queue, fp, NULL);
taosAddIntoQset(pool->qset, queue, ahandle);
int32_t queueNum = taosGetQueueNumber(pool->qset);
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
if (dstWorkerNum < 1) dstWorkerNum = 1;
// spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) {
SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker));
if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
taosMemoryFree(worker);
taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
worker->id = curWorkerNum;
worker->pool = pool;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
(void)taosArrayPop(pool->workers);
taosMemoryFree(worker);
taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
taosThreadAttrDestroy(&thAttr);
uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers));
curWorkerNum++;
}
taosThreadMutexUnlock(&pool->mutex);
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
return queue;
}
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
taosCloseQueue(queue);
}
......@@ -152,7 +281,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) {
worker->pool = pool;
}
uDebug("worker:%s is initialized, max:%d", pool->name, pool->max);
uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
return 0;
}
......@@ -169,17 +298,19 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) {
SWWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
taosFreeQall(worker->qall);
taosCloseQset(worker->qset);
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
}
}
taosMemoryFreeClear(pool->workers);
taosThreadMutexDestroy(&pool->mutex);
uDebug("worker:%s is closed", pool->name);
uInfo("worker:%s is closed", pool->name);
}
static void *tWWorkerThreadFp(SWWorker *worker) {
......@@ -235,7 +366,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
pool->nextId = (pool->nextId + 1) % pool->max;
taosThreadAttrDestroy(&thAttr);
......@@ -259,13 +390,14 @@ _OVER:
} else {
while (worker->pid <= 0) taosMsleep(10);
queue->threadId = worker->pid;
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId);
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
queue->threadId);
return queue;
}
}
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
taosCloseQueue(queue);
}
......
#!/bin/bash
ulimit -c unlimited
#======================p1-insert===============
python3 ./test.py -f 0-others/taosShell.py
python3 ./test.py -f 0-others/taosShellError.py
python3 ./test.py -f 0-others/taosShellNetChk.py
python3 ./test.py -f 1-insert/alter_database.py
python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
python3 ./test.py -f 1-insert/alter_stable.py
python3 ./test.py -f 1-insert/alter_table.py
python3 ./test.py -f 1-insert/boundary.py
python3 ./test.py -f 2-query/top.py
python3 ./test.py -f 2-query/top.py -R
python3 ./test.py -f 2-query/tsbsQuery.py
python3 ./test.py -f 2-query/tsbsQuery.py -R
python3 ./test.py -f 2-query/ttl_comment.py
python3 ./test.py -f 2-query/ttl_comment.py -R
python3 ./test.py -f 2-query/twa.py
python3 ./test.py -f 2-query/twa.py -R
python3 ./test.py -f 2-query/union.py
python3 ./test.py -f 2-query/union.py -R
python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 -i False
python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 -i False
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 6 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 6 -M 3 -n 3
python3 ./test.py -f 7-tmq/subscribeStb4.py
python3 ./test.py -f 7-tmq/db.py
python3 ./test.py -f 7-tmq/tmqError.py
python3 ./test.py -f 7-tmq/schema.py
python3 ./test.py -f 7-tmq/stbFilter.py
python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py -f 7-tmq/tmqCheckData1.py
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
python3 ./test.py -f 7-tmq/tmqShow.py
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
python3 ./test.py -f 99-TDcase/TD-20582.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册