diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d9b82c8c59f18c106470c9e00154da12dc0232a6..a3384135020048e815df347a344031bd8332ec2b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -50,6 +50,7 @@ enum { TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER2, + TASK_STATUS_RESTORE, // only available for source task to replay WAL from the checkpoint }; enum { @@ -576,7 +577,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasks; - SHashObj* pRecoverStatus; + SHashObj* pRestoreTasks; void* ahandle; TXN* txn; FTaskExpand* expandFunc; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index fdd21c7092b28c7c6f18da7a6562012b07daa618..0a359bfd42583cf3b5b5c97b71cdb4d488db73f2 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -197,6 +197,7 @@ void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); +int64_t walReaderGetCurrentVer(const SWalReader* pReader); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ab83f29ef99b9d08097dbc45ab1e1912c90d987b..a0118ee7490971c72b0c775fcd9809e8aa861abf 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -35,7 +35,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); -static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); +static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 9911752f8e23827b7f3700118d499601569e5980..c713d1e247a8cd358fbe0c8a4a529f884e8810b6 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -57,6 +57,7 @@ target_sources( # tq "src/tq/tq.c" + "src/tq/tqUtil.c" "src/tq/tqScan.c" "src/tq/tqMeta.c" "src/tq/tqRead.c" @@ -64,6 +65,7 @@ target_sources( "src/tq/tqPush.c" "src/tq/tqSink.c" "src/tq/tqCommit.c" + "src/tq/tqRestore.c" "src/tq/tqSnapshot.c" "src/tq/tqOffsetSnapshot.c" ) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 884c01d397e63b6e6a3e8e56c7a8badff64cd3a9..d4af9ac48192edc5c224ceddb7a3338927e37a39 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -128,6 +128,10 @@ typedef struct { tmr_h timer; } STqMgmt; +typedef struct { + int32_t size; +} STqOffsetHead; + static STqMgmt tqMgmt = {0}; int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); @@ -154,10 +158,6 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); -typedef struct { - int32_t size; -} STqOffsetHead; - STqOffsetStore* tqOffsetOpen(STQ* pTq); void tqOffsetClose(STqOffsetStore*); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); @@ -176,6 +176,12 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); +int32_t tqDoRestoreSourceStreamTasks(STQ* pTq); + +// tq util +void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); +int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); +int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8b01ba237f26b4f03bfd8bad978be665a05b3570..412c2549b5b8e4d84a652fdd296812f659c27745 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -192,8 +192,9 @@ void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type); -int tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); +int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type); +int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); +int tqRestoreStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 44d5e26603a19553bcaf299bdbd3bc1c654957ce..aa8960e9770b0e7cac7734c1f96bd427e31489c4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -16,6 +16,7 @@ #include "tq.h" #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) +#define ALL_STREAM_TASKS_ID (-1) int32_t tqInit() { int8_t old; @@ -85,21 +86,6 @@ static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) pLeft->val.version <= pRight->val.version; } -// stream_task:stream_id:task_id -static void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { - int32_t n = 12; - char* p = dst; - - memcpy(p, "stream_task:", n); - p += n; - - int32_t inc = tintToHex(streamId, p); - p += inc; - - *(p++) = ':'; - tintToHex(taskId, p); -} - STQ* tqOpen(const char* path, SVnode* pVnode) { STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); if (pTq == NULL) { @@ -470,7 +456,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); taosWUnLockLatch(&pTq->lock); return code; } @@ -880,7 +866,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg atomic_store_32(&pHandle->epoch, -1); // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); + tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); @@ -925,6 +911,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { // expand executor if (pTask->fillHistory) { pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + } else { + pTask->taskStatus = TASK_STATUS_RESTORE; } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { @@ -1382,21 +1370,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } -static int32_t doAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { - int32_t code = tAppendDataForStream(pTask, pQueueItem); - if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); - return -1; - } - - if (streamSchedExec(pTask) < 0) { - tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno)); - return -1; - } - - return TSDB_CODE_SUCCESS; -} - static void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) { STqOffset offset = {0}; tqOffsetResetToLog(&offset.val, ver); @@ -1410,7 +1383,7 @@ static void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit, const char* key, int64_t ver) { doSaveTaskOffset(pOffsetStore, key, ver); - int32_t code = doAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver); + int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver); // remove the offset, if all functions are completed successfully. if (code == TSDB_CODE_SUCCESS) { @@ -1504,33 +1477,15 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { // all data has been retrieved from WAL, let's try submit block directly. if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort // append the data for the stream - SFetchRet ret = {0}; + SFetchRet ret = {.data.info.type = STREAM_NORMAL}; terrno = 0; tqNextBlock(pTask->exec.pTqReader, &ret); if (ret.fetchType == FETCH_TYPE__DATA) { - SStreamDataBlock* pBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (pBlocks == NULL) { // failed, do nothing - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = launchTaskForWalBlock(pTask, &ret, pOffset); + if (code != TSDB_CODE_SUCCESS) { continue; } - - ret.data.info.type = STREAM_NORMAL; - pBlocks->type = STREAM_INPUT__DATA_BLOCK; - pBlocks->sourceVer = pOffset->val.version; - pBlocks->blocks = taosArrayInit(0, sizeof(SSDataBlock)); - taosArrayPush(pBlocks->blocks, &ret.data); - - int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData); -// tqDebug("-----------%ld\n", ts[0]); - - code = doAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pBlocks, pBlocks->sourceVer); - if (code == TSDB_CODE_SUCCESS) { - pOffset->val.version = pTask->exec.pTqReader->pWalReader->curVersion; - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, - pOffset->val.version); - } - } else { // FETCH_TYPE__NONE, let's try submit block directly tqDebug("s-task:%s data in WAL are all consumed, try data in submit message", pTask->id.idStr); addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); @@ -1555,15 +1510,29 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask) { - tqDebug("stream task:%d start to process run req", pTask->id.taskId); - streamProcessRunReq(pTask); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + + int32_t taskId = pReq->taskId; + int32_t vgId = TD_VID(pTq->pVnode); + + if (taskId == ALL_STREAM_TASKS_ID) { // all tasks are restored from the wal + tqDoRestoreSourceStreamTasks(pTq); return 0; } else { - return -1; + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + if (pTask != NULL) { + if (pTask->taskStatus == TASK_STATUS__NORMAL) { + tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); + streamProcessRunReq(pTask); + } else { + tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); + } + + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + return 0; + } else { + tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId); + return -1; + } } } @@ -1703,3 +1672,25 @@ FAIL: } int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } + +int32_t tqRestoreStreamTasks(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno)); + return -1; + } + + tqInfo("vgId:%d start to restore all stream tasks", vgId); + + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = ALL_STREAM_TASKS_ID; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); + + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index dd003aec98eb42099a4dc3e3fbea9c5dfde91632..d651e945b50e4db10c0cf8fc2b0e44743685268c 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -322,8 +322,11 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v taosWUnLockLatch(&pTq->lock); } - // push data for stream processing - if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { + // push data for stream processing: + // 1. the vnode isn't in the restore procedure. + // 2. the vnode should be the leader. + // 3. the stream is not suspended yet. + if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && (!pTq->pVnode->restored)) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { return 0; } @@ -352,7 +355,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } -int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, +int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type) { uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); @@ -389,7 +392,7 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, return 0; } -int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { +int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { int32_t vgId = TD_VID(pTq->pVnode); STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c new file mode 100644 index 0000000000000000000000000000000000000000..50fcea2b54e6b1c8dc2054ed6090479667133157 --- /dev/null +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tq.h" + +static int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList); +static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList); + +// this function should be executed by stream threads. +// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure +// will not stop eventually. +int tqDoRestoreSourceStreamTasks(STQ* pTq) { + + // todo set the offset value from the previous check point offset + int64_t st = taosGetTimestampMs(); + int32_t vgId = TD_VID(pTq->pVnode); + int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks); + tqInfo("vgId:%d start restoring stream tasks, total tasks:%d", vgId, numOfTasks); + + while (1) { + SArray* pTaskList = taosArrayInit(4, POINTER_BYTES); + + // check all restore tasks + restoreStreamTask(pTq->pStreamMeta, pTq->pOffsetStore, pTaskList); + transferToNormalTask(pTq->pStreamMeta, pTaskList); + taosArrayDestroy(pTaskList); + + int32_t numOfRestored = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks); + if (numOfRestored <= 0) { + break; + } + } + + int64_t et = taosGetTimestampMs(); + tqInfo("vgId:%d restoring task completed, elapsed time:%" PRId64 " sec.", vgId, (et - st)); + return 0; +} + +int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { + int32_t numOfTask = taosArrayGetSize(pTaskList); + if (numOfTask <= 0) { + return TSDB_CODE_SUCCESS; + } + + // todo: add lock + for(int32_t i = 0; i < numOfTask; ++i){ + SStreamTask* pTask = taosArrayGetP(pTaskList, i); + tqDebug("vgId:%d transfer s-task:%s state restore -> ready", pStreamMeta->vgId, pTask->id.idStr); + taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + + // NOTE: do not change the following order + atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); + taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList) { + // check all restore tasks + void* pIter = NULL; + + while (1) { + pIter = taosHashIterate(pStreamMeta->pRestoreTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + continue; + } + + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->taskStatus); + continue; + } + + // check if offset value exists + char key[128] = {0}; + createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); + + if (tInputQueueIsFull(pTask)) { + tqDebug("s-task:%s input queue is full, do nothing" PRId64, pTask->id.idStr); + continue; + } + + // check if offset value exists + STqOffset* pOffset = tqOffsetRead(pOffsetStore, key); + if (pOffset != NULL) { + // seek the stored version and extract data from WAL + int32_t code = tqSeekVer(pTask->exec.pTqReader, pOffset->val.version, ""); + if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort + // append the data for the stream + SFetchRet ret = {.data.info.type = STREAM_NORMAL}; + terrno = 0; + + tqNextBlock(pTask->exec.pTqReader, &ret); + if (ret.fetchType == FETCH_TYPE__DATA) { + code = launchTaskForWalBlock(pTask, &ret, pOffset); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + } else { + // FETCH_TYPE__NONE: all data has been retrieved from WAL, let's try submit block directly. + tqDebug("s-task:%s data in WAL are all consumed, transfer this task to be normal state", pTask->id.idStr); + taosArrayPush(pTaskList, &pTask); + } + } else { // failed to seek to the WAL version + tqDebug("s-task:%s data in WAL are all consumed, transfer this task to be normal state", pTask->id.idStr); + taosArrayPush(pTaskList, &pTask); + } + } else { + ASSERT(0); + } + } + + return 0; +} + diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c new file mode 100644 index 0000000000000000000000000000000000000000..ac88cf1916f80aff334590e4c6947fcd47d83972 --- /dev/null +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tq.h" + +// stream_task:stream_id:task_id +void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { + int32_t n = 12; + char* p = dst; + + memcpy(p, "stream_task:", n); + p += n; + + int32_t inc = tintToHex(streamId, p); + p += inc; + + *(p++) = ':'; + tintToHex(taskId, p); +} + +int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { + int32_t code = tAppendDataForStream(pTask, pQueueItem); + if (code < 0) { + tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); + return -1; + } + + if (streamSchedExec(pTask) < 0) { + tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno)); + return -1; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset) { + SStreamDataBlock* pBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); + if (pBlocks == NULL) { // failed, do nothing + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pRet->data.info.type = STREAM_NORMAL; + pBlocks->type = STREAM_INPUT__DATA_BLOCK; + pBlocks->sourceVer = pOffset->val.version; + pBlocks->blocks = taosArrayInit(0, sizeof(SSDataBlock)); + taosArrayPush(pBlocks->blocks, &pRet->data); + +// int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData); +// tqDebug("-----------%ld\n", ts[0]); + + int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pBlocks, pBlocks->sourceVer); + if (code == TSDB_CODE_SUCCESS) { + pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pTqReader->pWalReader); + tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, + pOffset->val.version); + } + + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d681f5b65e2228e6d2b16c88688ba672ad26564d..eb3c5d1f6484d450ea00ae40f0d3b4089b9d0e7d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -549,6 +549,9 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) pVnode->restored = true; vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); + + // start to restore all stream tasks + tqRestoreStreamTasks(pVnode->pTq); } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 577f6d6e00febf6c7a1d9b0c6285a3b617e61d0e..8693915c46fb1f19fcbb9a14e993288d12c8c186 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -45,11 +45,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); + pMeta->pTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); if (pMeta->pTasks == NULL) { goto _err; } + pMeta->pRestoreTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); + if (pMeta->pRestoreTasks == NULL) { + goto _err; + } + if (streamMetaBegin(pMeta) < 0) { goto _err; } @@ -62,6 +68,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF _err: taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); + if (pMeta->pRestoreTasks) taosHashCleanup(pMeta->pRestoreTasks); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); @@ -87,8 +94,9 @@ void streamMetaClose(SStreamMeta* pMeta) { tFreeStreamTask(pTask); /*streamMetaReleaseTask(pMeta, pTask);*/ } + taosHashCleanup(pMeta->pTasks); - taosHashCleanup(pMeta->pRecoverStatus); + taosHashCleanup(pMeta->pRestoreTasks); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); } @@ -166,8 +174,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { return -1; } - taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)); - + taosHashPut(pMeta->pRestoreTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); return 0; } #endif @@ -298,12 +305,13 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pRestoreTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); return -1; } + /*pTask->taskStatus = TASK_STATUS__NORMAL;*/ if (pTask->fillHistory) { pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index ad6127ead23d0713c9fa963741d460a30f50c249..e20299be38ecb94b058356e57c8ddbda5df80102 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -100,6 +100,8 @@ int32_t walNextValidMsg(SWalReader *pReader) { return -1; } +int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } + static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0;