enh(stream): the stream will start after vnode restore being completed.

TASK_STATUS_RESTORE, // only available for source task to replay WAL from the checkpoint
enum {
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pRecoverStatus;
SHashObj* pRestoreTasks;
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader* pReader);
// only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
# tq
tmr_h timer;
} STqMgmt;
typedef struct {
int32_t size;
} STqOffsetHead;
static STqMgmt tqMgmt = {0};
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
typedef struct {
int32_t size;
} STqOffsetHead;
STqOffsetStore* tqOffsetOpen(STQ* pTq);
void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqDoRestoreSourceStreamTasks(STQ* pTq);
// tq util
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset);
#ifdef __cplusplus
STQ* tqOpen(const char* path, SVnode* pVnode);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
int tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqRestoreStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
#include "tq.h"
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
#define ALL_STREAM_TASKS_ID (-1)
int32_t tqInit() {
int8_t old;
pLeft->val.version <= pRight->val.version;
// stream_task:stream_id:task_id
static void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) {
int32_t n = 12;
char* p = dst;
memcpy(p, "stream_task:", n);
p += n;
int32_t inc = tintToHex(streamId, p);
p += inc;
*(p++) = ':';
tintToHex(taskId, p);
STQ* tqOpen(const char* path, SVnode* pVnode) {
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
if (pTq == NULL) {
......@@ -470,7 +456,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
return code;
......@@ -880,7 +866,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_store_32(&pHandle->epoch, -1);
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
......@@ -925,6 +911,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
// expand executor
if (pTask->fillHistory) {
} else {
pTask->taskStatus = TASK_STATUS_RESTORE;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
return 0;
static int32_t doAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
int32_t code = tAppendDataForStream(pTask, pQueueItem);
if (code < 0) {
tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver);
return -1;
if (streamSchedExec(pTask) < 0) {
tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno));
return -1;
static void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) {
STqOffset offset = {0};
tqOffsetResetToLog(&offset.val, ver);
......@@ -1410,7 +1383,7 @@ static void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int
static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit,
const char* key, int64_t ver) {
doSaveTaskOffset(pOffsetStore, key, ver);
int32_t code = doAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver);
int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver);
// remove the offset, if all functions are completed successfully.
if (code == TSDB_CODE_SUCCESS) {
// all data has been retrieved from WAL, let's try submit block directly.
if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort
// append the data for the stream
SFetchRet ret = {0};
SFetchRet ret = {.data.info.type = STREAM_NORMAL};
terrno = 0;
tqNextBlock(pTask->exec.pTqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
SStreamDataBlock* pBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (pBlocks == NULL) { // failed, do nothing
code = launchTaskForWalBlock(pTask, &ret, pOffset);
if (code != TSDB_CODE_SUCCESS) {
ret.data.info.type = STREAM_NORMAL;
pBlocks->sourceVer = pOffset->val.version;
pBlocks->blocks = taosArrayInit(0, sizeof(SSDataBlock));
taosArrayPush(pBlocks->blocks, &ret.data);
int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData);
// tqDebug("-----------%ld\n", ts[0]);
code = doAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pBlocks, pBlocks->sourceVer);
if (code == TSDB_CODE_SUCCESS) {
pOffset->val.version = pTask->exec.pTqReader->pWalReader->curVersion;
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
} else { // FETCH_TYPE__NONE, let's try submit block directly
tqDebug("s-task:%s data in WAL are all consumed, try data in submit message", pTask->id.idStr);
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == ALL_STREAM_TASKS_ID) { // all tasks are restored from the wal
return 0;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) {
tqDebug("stream task:%d start to process run req", pTask->id.taskId);
if (pTask != NULL) {
if (pTask->taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
} else {
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
} else {
tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
return -1;
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
int32_t tqRestoreStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno));
return -1;
tqInfo("vgId:%d start to restore all stream tasks", vgId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = ALL_STREAM_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
return 0;
// push data for stream processing
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
// push data for stream processing:
// 1. the vnode isn't in the restore procedure.
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && (!pTq->pVnode->restored)) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
return 0;
return 0;
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
SMqDataRsp* pDataRsp, int32_t type) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
return 0;
int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t vgId = TD_VID(pTq->pVnode);
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "tq.h"
static int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList);
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually.
int tqDoRestoreSourceStreamTasks(STQ* pTq) {
// todo set the offset value from the previous check point offset
int64_t st = taosGetTimestampMs();
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks);
tqInfo("vgId:%d start restoring stream tasks, total tasks:%d", vgId, numOfTasks);
while (1) {
SArray* pTaskList = taosArrayInit(4, POINTER_BYTES);
// check all restore tasks
restoreStreamTask(pTq->pStreamMeta, pTq->pOffsetStore, pTaskList);
transferToNormalTask(pTq->pStreamMeta, pTaskList);
int32_t numOfRestored = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks);
if (numOfRestored <= 0) {
int64_t et = taosGetTimestampMs();
tqInfo("vgId:%d restoring task completed, elapsed time:%" PRId64 " sec.", vgId, (et - st));
return 0;
int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
int32_t numOfTask = taosArrayGetSize(pTaskList);
if (numOfTask <= 0) {
// todo: add lock
for(int32_t i = 0; i < numOfTask; ++i){
SStreamTask* pTask = taosArrayGetP(pTaskList, i);
tqDebug("vgId:%d transfer s-task:%s state restore -> ready", pStreamMeta->vgId, pTask->id.idStr);
taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
// NOTE: do not change the following order
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList) {
// check all restore tasks
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pStreamMeta->pRestoreTasks, pIter);
if (pIter == NULL) {
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->taskStatus);
// check if offset value exists
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
if (tInputQueueIsFull(pTask)) {
tqDebug("s-task:%s input queue is full, do nothing" PRId64, pTask->id.idStr);
// check if offset value exists
STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
if (pOffset != NULL) {
// seek the stored version and extract data from WAL
int32_t code = tqSeekVer(pTask->exec.pTqReader, pOffset->val.version, "");
if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort
// append the data for the stream
SFetchRet ret = {.data.info.type = STREAM_NORMAL};
terrno = 0;
tqNextBlock(pTask->exec.pTqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
code = launchTaskForWalBlock(pTask, &ret, pOffset);
if (code != TSDB_CODE_SUCCESS) {
} else {
// FETCH_TYPE__NONE: all data has been retrieved from WAL, let's try submit block directly.
tqDebug("s-task:%s data in WAL are all consumed, transfer this task to be normal state", pTask->id.idStr);
taosArrayPush(pTaskList, &pTask);
} else { // failed to seek to the WAL version
tqDebug("s-task:%s data in WAL are all consumed, transfer this task to be normal state", pTask->id.idStr);
taosArrayPush(pTaskList, &pTask);
} else {
return 0;
#include "tq.h"
// stream_task:stream_id:task_id
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) {
int32_t n = 12;
char* p = dst;
memcpy(p, "stream_task:", n);
p += n;
int32_t inc = tintToHex(streamId, p);
p += inc;
*(p++) = ':';
tintToHex(taskId, p);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
int32_t code = tAppendDataForStream(pTask, pQueueItem);
if (code < 0) {
tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver);
return -1;
if (streamSchedExec(pTask) < 0) {
tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno));
return -1;
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset) {
SStreamDataBlock* pBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (pBlocks == NULL) { // failed, do nothing
return -1;
pRet->data.info.type = STREAM_NORMAL;
pBlocks->sourceVer = pOffset->val.version;
pBlocks->blocks = taosArrayInit(0, sizeof(SSDataBlock));
taosArrayPush(pBlocks->blocks, &pRet->data);
// int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData);
// tqDebug("-----------%ld\n", ts[0]);
int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pBlocks, pBlocks->sourceVer);
if (code == TSDB_CODE_SUCCESS) {
pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pTqReader->pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
return 0;
\ No newline at end of file
pVnode->restored = true;
vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
// start to restore all stream tasks
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
......@@ -45,11 +45,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK);
if (pMeta->pTasks == NULL) {
goto _err;
pMeta->pRestoreTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK);
if (pMeta->pRestoreTasks == NULL) {
goto _err;
if (streamMetaBegin(pMeta) < 0) {
goto _err;
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pRestoreTasks) taosHashCleanup(pMeta->pRestoreTasks);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
/*streamMetaReleaseTask(pMeta, pTask);*/
return -1;
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*));
taosHashPut(pMeta->pRestoreTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
return 0;
return -1;
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
if (taosHashPut(pMeta->pRestoreTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
return -1;
/*pTask->taskStatus = TASK_STATUS__NORMAL;*/
if (pTask->fillHistory) {
return -1;
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0;
