From a3ac8b647cd312f5953bc8d56dd09908cf9b3e18 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Apr 2023 18:20:35 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- include/util/tqueue.h | 16 ++++++++-------- source/client/src/clientTmq.c | 3 ++- source/client/test/clientTests.cpp | 8 ++++---- source/dnode/vnode/src/tq/tq.c | 25 ++++++++++++------------- source/libs/stream/src/stream.c | 12 +++++++----- 6 files changed, 34 insertions(+), 32 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0c822a4007..5b1d1fa1bc 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -356,7 +356,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask); -int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem); +int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem); static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 1f6b205cdf..d05b5418b3 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -61,7 +61,7 @@ typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); typedef struct STaosQnode STaosQnode; -typedef struct STaosQnode { +struct STaosQnode { STaosQnode *next; STaosQueue *queue; int64_t timestamp; @@ -70,9 +70,9 @@ typedef struct STaosQnode { int8_t itype; int8_t reserved[3]; char item[]; -} STaosQnode; +}; -typedef struct STaosQueue { +struct STaosQueue { STaosQnode *head; STaosQnode *tail; STaosQueue *next; // for queue set @@ -84,22 +84,22 @@ typedef struct STaosQueue { int64_t memOfItems; int32_t numOfItems; int64_t threadId; -} STaosQueue; +}; -typedef struct STaosQset { +struct STaosQset { STaosQueue *head; STaosQueue *current; TdThreadMutex mutex; tsem_t sem; int32_t numOfQueues; int32_t numOfItems; -} STaosQset; +}; -typedef struct STaosQall { +struct STaosQall { STaosQnode *current; STaosQnode *start; int32_t numOfItems; -} STaosQall; +}; STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 71af273243..3e963dd3e8 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1338,8 +1338,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosWriteQitem(tmq->mqueue, pRspWrapper); + int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, - tmq->consumerId, rspType, vgId, tmq->mqueue->numOfItems, requestId); + tmq->consumerId, rspType, vgId, total, requestId); tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 6cb89c13ec..84e3424264 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -161,10 +161,10 @@ void *queryThread(void *arg) { return NULL; } -static int32_t numOfThreads = 1; +int32_t numOfThreads = 1; void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) { - printf("success, code:%d\n", code); + printf("auto commit success, code:%d\n\n\n\n", code); } void* doConsumeData(void* param) { @@ -173,7 +173,7 @@ void* doConsumeData(void* param) { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName12"); + tmq_conf_set(conf, "group.id", "cgrpName41"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); @@ -1060,7 +1060,7 @@ TEST(clientCase, sub_tb_test) { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName27"); + tmq_conf_set(conf, "group.id", "cgrpName45"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5f171eb5ee..606cfc294a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1332,7 +1332,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { pRefBlock->dataRef = pRef; atomic_add_fetch_32(pRefBlock->dataRef, 1); - if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pRefBlock) < 0) { qError("stream task input del failed, task id %d", pTask->taskId); atomic_sub_fetch_32(pRef, 1); @@ -1367,7 +1367,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { taosArrayPush(pStreamBlock->blocks, &block); if (!failed) { - if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { qError("stream task input del failed, task id %d", pTask->taskId); continue; } @@ -1388,13 +1388,13 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { void* pIter = NULL; - bool failed = false; + bool succ = true; SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit); if (pSubmit == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to create data submit for stream since out of memory"); - failed = true; + succ = false; } while (1) { @@ -1409,20 +1409,19 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { } if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); + tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->taskId, pTask->taskStatus); continue; } - tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver); - - if (!failed) { - if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { - tqError("stream task input failed, task id %d", pTask->taskId); + tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver); + if (succ) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit) < 0) { + tqError("stream task:%d failed to put into queue for, too many", pTask->taskId); continue; } if (streamSchedExec(pTask) < 0) { - tqError("stream task launch failed, task id %d", pTask->taskId); + tqError("stream task:%d launch failed, code:%s", pTask->taskId, tstrerror(terrno)); continue; } } else { @@ -1430,12 +1429,12 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { } } - if (pSubmit) { + if (pSubmit != NULL) { streamDataSubmitDestroy(pSubmit); taosFreeQitem(pSubmit); } - return failed ? -1 : 0; + return succ ? 0 : -1; } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d87d445ba5..fd0a5233a1 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -68,7 +68,7 @@ void streamSchedByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); - if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) { taosFreeQitem(trigger); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); return; @@ -123,7 +123,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ streamDispatchReqToData(pReq, pData); - if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; @@ -164,7 +164,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ streamRetrieveReqToData(pReq, pData); - if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; @@ -275,7 +275,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } -int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { +int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { @@ -288,9 +288,11 @@ int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { } taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); + + int32_t total = taosQueueItemSize(pTask->inputQueue->queue); qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId, pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, - pSubmitBlock->submit.ver, pTask->inputQueue->queue->numOfItems); + pSubmitBlock->submit.ver, total); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { taosWriteQitem(pTask->inputQueue->queue, pItem); -- GitLab