diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9f7d366a4602c25a5f6d9e876ae68dceeba048ec..eea8868b8c3fe60b7b33b69102f1a1f8bd1ebe99 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -31,6 +31,7 @@ extern "C" { #ifndef _STREAM_H_ #define _STREAM_H_ +typedef void (*_free_reader_fn_t)(void*); typedef struct SStreamTask SStreamTask; enum { @@ -218,9 +219,10 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); typedef struct { - char* qmsg; - void* pExecutor; // not applicable to encoder and decoder - struct STqReader* pTqReader; // not applicable to encoder and decoder + char* qmsg; + void* pExecutor; // not applicable to encoder and decoder + struct STqReader* pTqReader; // not applicable to encoder and decoder + struct SWalReader* pWalReader; // not applicable to encoder and decoder } STaskExec; typedef struct { @@ -331,6 +333,7 @@ struct SStreamTask { int64_t checkpointingId; int32_t checkpointAlignCnt; struct SStreamMeta* pMeta; + _free_reader_fn_t freeFp; }; // meta @@ -340,12 +343,14 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasks; - SHashObj* pRestoreTasks; + SHashObj* pWalReadTasks; void* ahandle; TXN* txn; FTaskExpand* expandFunc; int32_t vgId; SRWLatch lock; + int8_t walScan; + bool quit; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -355,7 +360,7 @@ SStreamTask* tNewStreamTask(int64_t streamId); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); -int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem); +int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); bool tInputQueueIsFull(const SStreamTask* pTask); static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { @@ -568,8 +573,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); +int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); +int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 0a359bfd42583cf3b5b5c97b71cdb4d488db73f2..b51289de5e46e05d58d5af8a8b130db292439103 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -139,7 +139,7 @@ typedef struct { } SWalFilterCond; // todo hide this struct -typedef struct { +typedef struct SWalReader { SWal *pWal; int64_t readerId; TdFilePtr pLogFile; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 7ccbb3b586d8734c6809631d3695efb46a03e518..4235548e48045041d586206cac3de89a4a5ee93c 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -150,7 +150,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); // 2.save task - code = streamMetaAddTask(pSnode->pMeta, -1, pTask); + code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask); if (code < 0) { return -1; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d62eebd2e138034352e499da3dbc13fefefd4fdb..e6e21e1e4a8d0ead0f8b57016d65292464300bfd 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -260,7 +260,8 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); -void tqNextBlock(STqReader *pReader, SFetchRet *ret); +void tqNextBlock(STqReader *pReader, SFetchRet *ret); +int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 94ba399a0a26e36bb6a5b81db95f5a7b3cd49be7..c007f8479035c2ceaa2528c4093e52a7fa306456 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -176,7 +176,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); -int32_t tqDoRestoreSourceStreamTasks(STQ* pTq); +int32_t tqStreamTasksScanWal(STQ* pTq); // tq util void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); @@ -187,6 +187,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver); void saveOffsetForAllTasks(STQ* pTq, int64_t ver); void initOffsetForAllRestoreTasks(STQ* pTq); +int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 412c2549b5b8e4d84a652fdd296812f659c27745..16dea8aebde7a418c550698294ef0f8131d0b407 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -194,7 +194,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type); int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); -int tqRestoreStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b029afc93554e7b08c1590119130585fbea2eb57..2b911befccb4dfd42a08363ea096e4a80d195b29 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,7 +15,7 @@ #include "tq.h" -#define ALL_STREAM_TASKS_ID (-1) +#define WAL_READ_TASKS_ID (-1) int32_t tqInit() { int8_t old; @@ -630,6 +630,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } + pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + + pTask->freeFp = (_free_reader_fn_t)tqCloseReader; SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor); tqReaderAddTbUidList(pTask->exec.pTqReader, pList); } @@ -640,6 +643,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return 0; } +void tFreeStreamTask(SStreamTask* pTask); + int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); @@ -754,8 +759,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tDecoderClear(&decoder); // 2.save task - code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask); + code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); if (code < 0) { + tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, + streamMetaGetNumOfTasks(pTq->pStreamMeta)); return -1; } @@ -764,6 +771,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms streamTaskCheckDownstream(pTask, sversion); } + tqDebug("vgId:%d s-task:%s is deployed from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, + pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); return 0; } @@ -973,7 +982,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { pRefBlock->dataRef = pRef; atomic_add_fetch_32(pRefBlock->dataRef, 1); - if (tAppendDataForStream(pTask, (SStreamQueueItem*)pRefBlock) < 0) { + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { qError("stream task input del failed, task id %d", pTask->id.taskId); atomic_sub_fetch_32(pRef, 1); @@ -1008,7 +1017,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { taosArrayPush(pStreamBlock->blocks, &block); if (!failed) { - if (tAppendDataForStream(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { qError("stream task input del failed, task id %d", pTask->id.taskId); continue; } @@ -1036,12 +1045,13 @@ static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTa if (code == TSDB_CODE_SUCCESS) { tqOffsetDelete(pOffsetStore, key); } - return TSDB_CODE_SUCCESS; + + return code; } int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { +#if 0 void* pIter = NULL; - SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT); if (pSubmit == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1050,6 +1060,8 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { return -1; } + SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES); + while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); if (pIter == NULL) { @@ -1081,47 +1093,23 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { ver = pOffset->val.version; } - tqDebug("s-task:%s input queue is full, do nothing, start ver:%" PRId64, pTask->id.idStr, ver); + tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver); + taosArrayPush(pInputQueueFullTasks, &pTask); continue; } // check if offset value exists STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - if (pOffset != NULL) { - // seek the stored version and extract data from WAL - int32_t code = tqSeekVer(pTask->exec.pTqReader, pOffset->val.version, ""); - - // all data has been retrieved from WAL, let's try submit block directly. - if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort - // append the data for the stream - SFetchRet ret = {.data.info.type = STREAM_NORMAL}; - terrno = 0; - - tqNextBlock(pTask->exec.pTqReader, &ret); - if (ret.fetchType == FETCH_TYPE__DATA) { - code = launchTaskForWalBlock(pTask, &ret, pOffset); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - } else { // FETCH_TYPE__NONE, let's try submit block directly - tqDebug("s-task:%s data in WAL are all consumed, try data in submit message", pTask->id.idStr); - addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); - } + ASSERT(pOffset == NULL); - // do nothing if failed, since the offset value is kept already - } else { // failed to seek to the WAL version - // todo handle the case where offset has been deleted in WAL, due to stream computing too slow - tqDebug("s-task:%s data in WAL are all consumed, try data in submit msg", pTask->id.idStr); - addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); - } - } else { - addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); - } + addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); } streamDataSubmitDestroy(pSubmit); taosFreeQitem(pSubmit); +#endif + tqStartStreamTasks(pTq); return 0; } @@ -1131,29 +1119,31 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = pReq->taskId; int32_t vgId = TD_VID(pTq->pVnode); - if (taskId == ALL_STREAM_TASKS_ID) { // all tasks are restored from the wal - tqDoRestoreSourceStreamTasks(pTq); + if (taskId == WAL_READ_TASKS_ID) { // all tasks are extracted submit data from the wal + tqStreamTasksScanWal(pTq); return 0; - } else { - SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId); - if (pTask != NULL) { - if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { - tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); - streamProcessRunReq(pTask); - } else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) { - tqDebug("vgId:%d s-task:%s start to process in restore procedure from last chk point:%" PRId64, vgId, - pTask->id.idStr, pTask->chkInfo.version); - streamProcessRunReq(pTask); - } else { - tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); - } + } - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; + SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId); + if (pTask != NULL) { + if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { + tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); + streamProcessRunReq(pTask); + } else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) { + tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, + pTask->id.idStr, pTask->chkInfo.version); + streamProcessRunReq(pTask); } else { - tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId); - return -1; + tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); } + + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + + tqStartStreamTasks(pTq); + return 0; + } else { + tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId); + return -1; } } @@ -1165,14 +1155,10 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - int32_t taskId = req.taskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; + SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; streamProcessDispatchReq(pTask, &req, &rsp, exec); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; @@ -1294,26 +1280,39 @@ FAIL: int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } -int32_t tqRestoreStreamTasks(STQ* pTq) { +int32_t tqStartStreamTasks(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + taosWLockLatch(&pMeta->lock); + pMeta->walScan += 1; + + if (pMeta->walScan > 1) { + tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScan); + taosWUnLockLatch(&pTq->pStreamMeta->lock); + return 0; + } + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno)); + taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } - int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks); - tqInfo("vgId:%d start restoring stream tasks, total tasks:%d", vgId, numOfTasks); + int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); + + tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); initOffsetForAllRestoreTasks(pTq); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = ALL_STREAM_TASKS_ID; + pRunReq->taskId = WAL_READ_TASKS_ID; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); + taosWUnLockLatch(&pTq->pStreamMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index d651e945b50e4db10c0cf8fc2b0e44743685268c..df6648a6af30fc01c5773857cecd58b8655101d9 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -322,16 +322,19 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v taosWUnLockLatch(&pTq->lock); } + tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); + // push data for stream processing: - // 1. the vnode isn't in the restore procedure. + // 1. the vnode has already been restored. // 2. the vnode should be the leader. // 3. the stream is not suspended yet. - if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && (!pTq->pVnode->restored)) { + if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { return 0; } if (msgType == TDMT_VND_SUBMIT) { +#if 0 void* data = taosMemoryMalloc(len); if (data == NULL) { // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry @@ -343,7 +346,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v memcpy(data, pReq, len); SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver}; - tqDebug("tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", data, len, ver, pReq); + tqDebug("vgId:%d tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", vgId, data, len, ver, pReq); + tqProcessSubmitReq(pTq, submit); +#endif + SPackedData submit = {0}; tqProcessSubmitReq(pTq, submit); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 14d599551dd80f295e4073552e5df48919492f8d..69624f4d1037b31cac5593971f2f99ec6c308213 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -300,6 +300,28 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { return 0; } +int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { + if (walNextValidMsg(pReader) < 0) { + return -1; + } + + void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + int64_t ver = pReader->pHead->head.version; + + void* data = taosMemoryMalloc(len); + if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); + return -1; + } + + memcpy(data, pBody, len); + *pPackedData = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; + return 0; +} + void tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { if (pReader->msg2.msgStr == NULL) { @@ -434,7 +456,10 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; - if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + if (pSubmitTbDataRet) { + *pSubmitTbDataRet = pSubmitTbData; + } + int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 877c686d357b5675e9c3167419e83b9f73771cbe..6ed74ddcc3b10ef5705480e6773ea12c4fd1047c 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -15,60 +15,81 @@ #include "tq.h" -static int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList); +static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle); static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList); // this function should be executed by stream threads. // there is a case that the WAL increases more fast than the restore procedure, and this restore procedure // will not stop eventually. -int tqDoRestoreSourceStreamTasks(STQ* pTq) { +int tqStreamTasksScanWal(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); + while (1) { - SArray* pTaskList = taosArrayInit(4, POINTER_BYTES); + tqInfo("vgId:%d continue check if data in wal are available", vgId); // check all restore tasks - restoreStreamTaskImpl(pTq->pStreamMeta, pTq->pOffsetStore, pTaskList); - transferToNormalTask(pTq->pStreamMeta, pTaskList); - taosArrayDestroy(pTaskList); + bool allFull = true; + streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull); - int32_t numOfRestored = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks); - if (numOfRestored <= 0) { - break; - } - } - - int64_t et = taosGetTimestampMs(); - tqInfo("vgId:%d restoring task completed, elapsed time:%" PRId64 " sec.", TD_VID(pTq->pVnode), (et - st)); - return 0; -} + int32_t times = 0; -int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { - int32_t numOfTask = taosArrayGetSize(pTaskList); - if (numOfTask <= 0) { - return TSDB_CODE_SUCCESS; - } + if (allFull) { + taosWLockLatch(&pMeta->lock); + pMeta->walScan -= 1; + times = pMeta->walScan; - // todo: add lock - for (int32_t i = 0; i < numOfTask; ++i) { - SStreamTask* pTask = taosArrayGetP(pTaskList, i); - tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64, - pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id); - taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + if (pMeta->walScan <= 0) { + taosWUnLockLatch(&pMeta->lock); + break; + } - // NOTE: do not change the following order - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); - taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); + taosWUnLockLatch(&pMeta->lock); + tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times); + } } - return TSDB_CODE_SUCCESS; + double el = (taosGetTimestampMs() - st) / 1000.0; + tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el); + + // restore wal scan flag +// atomic_store_8(&pTq->pStreamMeta->walScan, 0); + return 0; } -int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList) { - // check all restore tasks - void* pIter = NULL; +//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { +// int32_t numOfTask = taosArrayGetSize(pTaskList); +// if (numOfTask <= 0) { +// return TSDB_CODE_SUCCESS; +// } +// +// // todo: add lock +// for (int32_t i = 0; i < numOfTask; ++i) { +// SStreamTask* pTask = taosArrayGetP(pTaskList, i); +// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64, +// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id); +// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); +// +// // NOTE: do not change the following order +// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); +// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); +// } +// +// return TSDB_CODE_SUCCESS; +//} + +int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) { + void* pIter = NULL; + int32_t vgId = pStreamMeta->vgId; + + *pScanIdle = true; + + bool allWalChecked = true; + tqDebug("vgId:%d start to check wal to extract new submit block", vgId); while (1) { - pIter = taosHashIterate(pStreamMeta->pRestoreTasks, pIter); + pIter = taosHashIterate(pStreamMeta->pTasks, pIter); if (pIter == NULL) { break; } @@ -78,8 +99,10 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS continue; } - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); + if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || + pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, + pTask->status.taskStatus); continue; } @@ -88,41 +111,57 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); if (tInputQueueIsFull(pTask)) { - tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); - taosMsleep(10); + tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr); continue; } + *pScanIdle = false; + // check if offset value exists STqOffset* pOffset = tqOffsetRead(pOffsetStore, key); - if (pOffset != NULL) { - // seek the stored version and extract data from WAL - int32_t code = tqSeekVer(pTask->exec.pTqReader, pOffset->val.version, ""); - if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort - // append the data for the stream - SFetchRet ret = {.data.info.type = STREAM_NORMAL}; - terrno = 0; - - tqNextBlock(pTask->exec.pTqReader, &ret); - if (ret.fetchType == FETCH_TYPE__DATA) { - code = launchTaskForWalBlock(pTask, &ret, pOffset); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - } else { - // FETCH_TYPE__NONE: all data has been retrieved from WAL, let's try submit block directly. - tqDebug("s-task:%s data in WAL are all consumed, transfer this task to be normal state", pTask->id.idStr); - taosArrayPush(pTaskList, &pTask); - } - } else { // failed to seek to the WAL version - tqDebug("s-task:%s data in WAL are all consumed, transfer this task to be normal state", pTask->id.idStr); - taosArrayPush(pTaskList, &pTask); - } + ASSERT(pOffset != NULL); + + // seek the stored version and extract data from WAL + int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + continue; + } + + // append the data for the stream + tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr); + + SPackedData packData = {0}; + code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); + if (code != TSDB_CODE_SUCCESS) { // failed, continue + continue; + } + + SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr); + continue; + } + + allWalChecked = false; + + tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr); + code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); + if (code == TSDB_CODE_SUCCESS) { + pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader); + tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, + pOffset->val.version); } else { - ASSERT(0); + // do nothing } + + streamDataSubmitDestroy(p); + taosFreeQitem(p); } + if (allWalChecked) { + *pScanIdle = true; + } return 0; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 2e8c6a53bb7858df62e2a26ab07285e3a8aa0210..791bfbe6df5e4bdcaf246542366490f0de64a7d0 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -35,7 +35,7 @@ void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { } int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { - int32_t code = tAppendDataForStream(pTask, pQueueItem); + int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); return -1; @@ -79,7 +79,7 @@ void initOffsetForAllRestoreTasks(STQ* pTq) { void* pIter = NULL; while(1) { - pIter = taosHashIterate(pTq->pStreamMeta->pRestoreTasks, pIter); + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); if (pIter == NULL) { break; } @@ -103,7 +103,6 @@ void initOffsetForAllRestoreTasks(STQ* pTq) { doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version); } } - } void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 76ff04b81abf3ae689f111a523721fc701900c82..24ebbe23d2a224bdd1be598c44fa981cc1c3b0ab 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -539,13 +539,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetBatchMeta(pVnode, pMsg); case TDMT_VND_TMQ_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); -#if 1 case TDMT_STREAM_TASK_DISPATCH: return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); -#endif case TDMT_STREAM_TASK_CHECK: return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index eb3c5d1f6484d450ea00ae40f0d3b4089b9d0e7d..9f5d722583d8c93fffb3ecb08d80bd6e7dfc45c8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -551,7 +551,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); // start to restore all stream tasks - tqRestoreStreamTasks(pVnode->pTq); + tqStartStreamTasks(pVnode->pTq); } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 71d4e5efd8c6f8abaf3c116c8e924e02243958eb..5ec5be169e53857e1c505fc301fdafa11a0817be 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -70,7 +70,7 @@ void streamSchedByTimer(void* param, void* tmrId) { trigger->pBlock->info.type = STREAM_GET_ALL; atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); - if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) { + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) { taosFreeQitem(trigger); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); return; @@ -110,16 +110,17 @@ int32_t streamSchedExec(SStreamTask* pTask) { SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) }; tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); + qDebug("trigger to run s-task:%s", pTask->id.idStr); } return 0; } -int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { +int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int8_t status; - // enqueue + // enqueue data block if (pData != NULL) { pData->type = STREAM_INPUT__DATA_BLOCK; pData->srcVgId = pReq->dataSrcVgId; @@ -127,10 +128,10 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ streamDispatchReqToData(pReq, pData); - if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) { + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; - } else { - status = TASK_INPUT_STATUS__FAILED; + } else { // input queue is full, upstream is blocked now + status = TASK_INPUT_STATUS__BLOCKED; } } else { streamTaskInputFail(pTask); @@ -148,8 +149,10 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR pCont->downstreamNodeId = htonl(pTask->nodeId); pCont->downstreamTaskId = htonl(pTask->id.taskId); pRsp->pCont = buf; + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); + return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } @@ -168,7 +171,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ streamRetrieveReqToData(pReq, pData); - if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) { + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; @@ -209,10 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("task %d receive dispatch req from node %d task %d", pTask->id.taskId, pReq->upstreamNodeId, + qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr, pReq->upstreamTaskId); - streamTaskEnqueue(pTask, pReq, pRsp); + streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); if (exec) { @@ -232,12 +235,14 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); - qDebug("task %d receive dispatch rsp, code: %x", pTask->id.taskId, code); + qDebug("s-task:%s receive dispatch rsp, code: %x", pTask->id.idStr, code); if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp); - if (leftRsp > 0) return 0; + if (leftRsp > 0) { + return 0; + } } int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); @@ -282,7 +287,7 @@ bool tInputQueueIsFull(const SStreamTask* pTask) { return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; } -int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { +int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { @@ -295,12 +300,12 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { } int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - qDebug("s-task:%s submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, - pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, + qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, + pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, total); - if (total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qDebug("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { + qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); streamDataSubmitDestroy(pSubmitBlock); return -1; } @@ -309,8 +314,8 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - if (total > 2) { - qDebug("stream task input queue is full, abort"); + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { + qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); return -1; } @@ -327,7 +332,6 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { } #if 0 - // TODO: back pressure atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); #endif diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 63d15f134d79eeae89d1926e157d843245c43cc4..ae616260f3b38038f103b661b4a05aabace9ac31 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -69,7 +69,6 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); - if (pDataSubmit == NULL) { return NULL; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4e491f906ad49dbd0ce1fc840d8bcdf3f888586c..a9f6d29bf5f51cf5826fac3c092b3bd1feff0aa7 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -238,7 +238,8 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* msg.pCont = buf; msg.msgType = TDMT_STREAM_TASK_CHECK; - qDebug("dispatch from task %d to task %d node %d: check msg", pTask->id.taskId, pReq->downstreamTaskId, nodeId); + qDebug("dispatch from s-task:%s to downstream s-task:%"PRIx64":%d node %d: check msg", pTask->id.idStr, + pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); @@ -319,8 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p msg.pCont = buf; msg.msgType = pTask->dispatchMsgType; - qDebug("dispatch from task %d to task %d node %d: data msg", pTask->id.taskId, pReq->taskId, vgId); - + qDebug("dispatch from s-task:%s to taskId:%d vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); tmsgSendReq(pEpSet, &msg); code = 0; @@ -402,14 +402,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat goto FAIL_FIXED_DISPATCH; } } + int32_t vgId = pTask->fixedEpDispatcher.nodeId; SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; req.taskId = downstreamTaskId; - qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->id.taskId, pTask->selfChildId, - downstreamTaskId, vgId); + qDebug("s-task:%s (child taskId:%d) dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, + pTask->selfChildId, blockNum, downstreamTaskId, vgId); if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { goto FAIL_FIXED_DISPATCH; @@ -494,6 +495,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t streamDispatch(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); + qDebug("s-task:%s try to dispatch intermediate result block to downstream, numofBlocks in outputQ:%d", pTask->id.idStr, + taosQueueItemSize(pTask->outputQueue->queue)); int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); @@ -503,13 +506,12 @@ int32_t streamDispatch(SStreamTask* pTask) { SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); if (pBlock == NULL) { - qDebug("stream stop dispatching since no output: task %d", pTask->id.taskId); + qDebug("s-task:%s stream stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return 0; } - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - qDebug("stream dispatching: task %d", pTask->id.taskId); + ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); int32_t code = 0; if (streamDispatchAllBlocks(pTask, pBlock) < 0) { @@ -518,6 +520,7 @@ int32_t streamDispatch(SStreamTask* pTask) { atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); goto FREE; } + FREE: taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index db9be593c065c695d423e75953625b6b14964ad1..3d896c08ac006799b7d096165f15cc94f8a528b5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -40,9 +40,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; - qDebug("s-task:%s set submit blocks as input %p %p %d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, - pSubmit->submit.msgLen, pSubmit->submit.ver); qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, + pSubmit->submit.msgLen, pSubmit->submit.ver); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; @@ -241,7 +241,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { - qDebug("s-task:%s stream task exec over, queue empty", pTask->id.idStr); +// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr); break; } @@ -280,12 +280,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK); + qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); streamTaskOutput(pTask, pInput); continue; } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("s-task:%s exec begin, msg batch: %d", pTask->id.idStr, batchSize); + qDebug("s-task:%s exec begin, numOfBlocks:%d", pTask->id.idStr, batchSize); streamTaskExecImpl(pTask, pInput, pRes); @@ -293,13 +294,21 @@ int32_t streamExecForAll(SStreamTask* pTask) { int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); if (dataVer > pTask->chkInfo.version) { // save it since the checkpoint is updated - qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer); + qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 + ", checkPoint id:%" PRId64 " -> %" PRId64, + pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); + pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId}; - streamMetaSaveTask(pTask->pMeta, pTask); + taosWLockLatch(&pTask->pMeta->lock); + streamMetaSaveTask(pTask->pMeta, pTask); if (streamMetaCommit(pTask->pMeta) < 0) { - qError("failed to commit stream meta, since %s", terrstr()); + taosWUnLockLatch(&pTask->pMeta->lock); + qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); return -1; + } else { + taosWUnLockLatch(&pTask->pMeta->lock); + qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); } } else { qDebug("s-task:%s exec end", pTask->id.idStr); @@ -354,6 +363,7 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed", pTask->id.idStr); if (!taosQueueEmpty(pTask->inputQueue->queue)) { streamSchedExec(pTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2e9bb4d762674eb006a64547406a74ca1c210560..4b423cc432f4851a663f5a228b75f391786ce565 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -51,8 +51,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->pRestoreTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); - if (pMeta->pRestoreTasks == NULL) { + pMeta->pWalReadTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); + if (pMeta->pWalReadTasks == NULL) { goto _err; } @@ -60,15 +60,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } + pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - + taosInitRWLatch(&pMeta->lock); return pMeta; _err: taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); - if (pMeta->pRestoreTasks) taosHashCleanup(pMeta->pRestoreTasks); + if (pMeta->pWalReadTasks) taosHashCleanup(pMeta->pWalReadTasks); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); @@ -83,20 +84,29 @@ void streamMetaClose(SStreamMeta* pMeta) { tdbClose(pMeta->db); void* pIter = NULL; + while(pMeta->walScan) { + qDebug("wait stream daemon quit"); + taosMsleep(100); + } + while (1) { pIter = taosHashIterate(pMeta->pTasks, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->timer) { taosTmrStop(pTask->timer); pTask->timer = NULL; } + tFreeStreamTask(pTask); /*streamMetaReleaseTask(pMeta, pTask);*/ } taosHashCleanup(pMeta->pTasks); - taosHashCleanup(pMeta->pRestoreTasks); + taosHashCleanup(pMeta->pWalReadTasks); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); } @@ -164,8 +174,8 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } -#if 1 -int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { +// add to the ready tasks hash map, not the restored tasks hash map +int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { return -1; } @@ -174,10 +184,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { return -1; } - taosHashPut(pMeta->pRestoreTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); + pTask->status.taskStatus = STREAM_STATUS__NORMAL; + taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); return 0; } -#endif + +int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { + int32_t numOfReady = taosHashGetSize(pMeta->pTasks); + int32_t numOfRestoring = taosHashGetSize(pMeta->pWalReadTasks); + return numOfReady + numOfRestoring; +} SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { taosRLockLatch(&pMeta->lock); @@ -206,9 +222,9 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { taosRLockLatch(&pMeta->lock); SStreamTask* pTask = NULL; - int32_t numOfRestored = taosHashGetSize(pMeta->pRestoreTasks); + int32_t numOfRestored = taosHashGetSize(pMeta->pWalReadTasks); if (numOfRestored > 0) { - SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pRestoreTasks, &taskId, sizeof(int32_t)); + SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pWalReadTasks, &taskId, sizeof(taskId)); if (p != NULL) { pTask = *p; if (pTask != NULL && (atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING)) { @@ -217,15 +233,15 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { return pTask; } } - } else { - SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); - if (p != NULL) { - pTask = *p; - if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) { - atomic_add_fetch_32(&pTask->refCnt, 1); - taosRUnLockLatch(&pMeta->lock); - return pTask; - } + } + + SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + if (p != NULL) { + pTask = *p; + if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) { + atomic_add_fetch_32(&pTask->refCnt, 1); + taosRUnLockLatch(&pMeta->lock); + return pTask; } } @@ -261,9 +277,12 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { int32_t streamMetaCommit(SStreamMeta* pMeta) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) { + ASSERT(0); return -1; } + if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { + ASSERT(0); return -1; } @@ -319,7 +338,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - if (taosHashPut(pMeta->pRestoreTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pWalReadTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 834c022a9ae5e817eb3c380a55d4682d51b09d59..7d2d7a666f968255980ee21558478f7577f087aa 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -186,7 +186,8 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->exec.pExecutor = NULL; } - if (pTask->exec.pTqReader != NULL) { + if (pTask->exec.pTqReader != NULL && pTask->freeFp != NULL) { + pTask->freeFp(pTask->exec.pTqReader); pTask->exec.pTqReader = NULL; } @@ -206,5 +207,9 @@ void tFreeStreamTask(SStreamTask* pTask) { streamStateClose(pTask->pState); } + if (pTask->id.idStr != NULL) { + taosMemoryFree((void*)pTask->id.idStr); + } + taosMemoryFree(pTask); } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index d57104dd7869690f98f5b5fa2152cfa6537b9edc..a49ff0cd5b26b4527886afebdf03ea1ffd82aa1f 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -248,7 +248,8 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem } taosThreadAttrDestroy(&thAttr); - uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers)); + int32_t numOfThreads = taosArrayGetSize(pool->workers); + uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum); curWorkerNum++; } diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index e69875d69f20b01f91ea2b90fd912c84b88d2455..15ca6bf7c924bb7cffba8f0607cb0a5c1fea95cb 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -37,7 +37,7 @@ if $loop_count == 20 then endi if $rows != 4 then - print =====rows=$rows + print =====rows=$rows, expect 4 goto loop0 endi @@ -53,7 +53,7 @@ if $data02 != 2 then endi if $data03 != 5 then - print =====data03=$data03 + print =====data03=$data03, expect:5 goto loop0 endi