提交 70b45a4c 编写于 作者: H Haojun Liao

fix(stream): all data should be extracted from wal.

上级 f6315af0
...@@ -31,6 +31,7 @@ extern "C" { ...@@ -31,6 +31,7 @@ extern "C" {
#ifndef _STREAM_H_ #ifndef _STREAM_H_
#define _STREAM_H_ #define _STREAM_H_
typedef void (*_free_reader_fn_t)(void*);
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
enum { enum {
...@@ -218,9 +219,10 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); ...@@ -218,9 +219,10 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit);
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);
typedef struct { typedef struct {
char* qmsg; char* qmsg;
void* pExecutor; // not applicable to encoder and decoder void* pExecutor; // not applicable to encoder and decoder
struct STqReader* pTqReader; // not applicable to encoder and decoder struct STqReader* pTqReader; // not applicable to encoder and decoder
struct SWalReader* pWalReader; // not applicable to encoder and decoder
} STaskExec; } STaskExec;
typedef struct { typedef struct {
...@@ -331,6 +333,7 @@ struct SStreamTask { ...@@ -331,6 +333,7 @@ struct SStreamTask {
int64_t checkpointingId; int64_t checkpointingId;
int32_t checkpointAlignCnt; int32_t checkpointAlignCnt;
struct SStreamMeta* pMeta; struct SStreamMeta* pMeta;
_free_reader_fn_t freeFp;
}; };
// meta // meta
...@@ -340,12 +343,14 @@ typedef struct SStreamMeta { ...@@ -340,12 +343,14 @@ typedef struct SStreamMeta {
TTB* pTaskDb; TTB* pTaskDb;
TTB* pCheckpointDb; TTB* pCheckpointDb;
SHashObj* pTasks; SHashObj* pTasks;
SHashObj* pRestoreTasks; SHashObj* pWalReadTasks;
void* ahandle; void* ahandle;
TXN* txn; TXN* txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
SRWLatch lock; SRWLatch lock;
int8_t walScan;
bool quit;
} SStreamMeta; } SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
...@@ -355,7 +360,7 @@ SStreamTask* tNewStreamTask(int64_t streamId); ...@@ -355,7 +360,7 @@ SStreamTask* tNewStreamTask(int64_t streamId);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask);
int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem);
bool tInputQueueIsFull(const SStreamTask* pTask); bool tInputQueueIsFull(const SStreamTask* pTask);
static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
...@@ -568,8 +573,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -568,8 +573,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
......
...@@ -139,7 +139,7 @@ typedef struct { ...@@ -139,7 +139,7 @@ typedef struct {
} SWalFilterCond; } SWalFilterCond;
// todo hide this struct // todo hide this struct
typedef struct { typedef struct SWalReader {
SWal *pWal; SWal *pWal;
int64_t readerId; int64_t readerId;
TdFilePtr pLogFile; TdFilePtr pLogFile;
......
...@@ -150,7 +150,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ...@@ -150,7 +150,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// 2.save task // 2.save task
code = streamMetaAddTask(pSnode->pMeta, -1, pTask); code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
......
...@@ -260,7 +260,8 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); ...@@ -260,7 +260,8 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
void tqNextBlock(STqReader *pReader, SFetchRet *ret); void tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
......
...@@ -176,7 +176,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); ...@@ -176,7 +176,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream // tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqDoRestoreSourceStreamTasks(STQ* pTq); int32_t tqStreamTasksScanWal(STQ* pTq);
// tq util // tq util
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
...@@ -187,6 +187,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ ...@@ -187,6 +187,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver); void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
void saveOffsetForAllTasks(STQ* pTq, int64_t ver); void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
void initOffsetForAllRestoreTasks(STQ* pTq); void initOffsetForAllRestoreTasks(STQ* pTq);
int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -194,7 +194,7 @@ void tqClose(STQ*); ...@@ -194,7 +194,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqRestoreStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "tq.h" #include "tq.h"
#define ALL_STREAM_TASKS_ID (-1) #define WAL_READ_TASKS_ID (-1)
int32_t tqInit() { int32_t tqInit() {
int8_t old; int8_t old;
...@@ -630,6 +630,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -630,6 +630,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return -1; return -1;
} }
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pTask->freeFp = (_free_reader_fn_t)tqCloseReader;
SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor); SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor);
tqReaderAddTbUidList(pTask->exec.pTqReader, pList); tqReaderAddTbUidList(pTask->exec.pTqReader, pList);
} }
...@@ -640,6 +643,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -640,6 +643,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return 0; return 0;
} }
void tFreeStreamTask(SStreamTask* pTask);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
...@@ -754,8 +759,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -754,8 +759,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
tDecoderClear(&decoder); tDecoderClear(&decoder);
// 2.save task // 2.save task
code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
streamMetaGetNumOfTasks(pTq->pStreamMeta));
return -1; return -1;
} }
...@@ -764,6 +771,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -764,6 +771,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamTaskCheckDownstream(pTask, sversion); streamTaskCheckDownstream(pTask, sversion);
} }
tqDebug("vgId:%d s-task:%s is deployed from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
return 0; return 0;
} }
...@@ -973,7 +982,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -973,7 +982,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
pRefBlock->dataRef = pRef; pRefBlock->dataRef = pRef;
atomic_add_fetch_32(pRefBlock->dataRef, 1); atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pRefBlock) < 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->id.taskId); qError("stream task input del failed, task id %d", pTask->id.taskId);
atomic_sub_fetch_32(pRef, 1); atomic_sub_fetch_32(pRef, 1);
...@@ -1008,7 +1017,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -1008,7 +1017,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
taosArrayPush(pStreamBlock->blocks, &block); taosArrayPush(pStreamBlock->blocks, &block);
if (!failed) { if (!failed) {
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->id.taskId); qError("stream task input del failed, task id %d", pTask->id.taskId);
continue; continue;
} }
...@@ -1036,12 +1045,13 @@ static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTa ...@@ -1036,12 +1045,13 @@ static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTa
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tqOffsetDelete(pOffsetStore, key); tqOffsetDelete(pOffsetStore, key);
} }
return TSDB_CODE_SUCCESS;
return code;
} }
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
#if 0
void* pIter = NULL; void* pIter = NULL;
SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT); SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT);
if (pSubmit == NULL) { if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1050,6 +1060,8 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { ...@@ -1050,6 +1060,8 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
return -1; return -1;
} }
SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES);
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) { if (pIter == NULL) {
...@@ -1081,47 +1093,23 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { ...@@ -1081,47 +1093,23 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
ver = pOffset->val.version; ver = pOffset->val.version;
} }
tqDebug("s-task:%s input queue is full, do nothing, start ver:%" PRId64, pTask->id.idStr, ver); tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver);
taosArrayPush(pInputQueueFullTasks, &pTask);
continue; continue;
} }
// check if offset value exists // check if offset value exists
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset != NULL) { ASSERT(pOffset == NULL);
// seek the stored version and extract data from WAL
int32_t code = tqSeekVer(pTask->exec.pTqReader, pOffset->val.version, "");
// all data has been retrieved from WAL, let's try submit block directly.
if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort
// append the data for the stream
SFetchRet ret = {.data.info.type = STREAM_NORMAL};
terrno = 0;
tqNextBlock(pTask->exec.pTqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
code = launchTaskForWalBlock(pTask, &ret, pOffset);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
} else { // FETCH_TYPE__NONE, let's try submit block directly
tqDebug("s-task:%s data in WAL are all consumed, try data in submit message", pTask->id.idStr);
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
}
// do nothing if failed, since the offset value is kept already addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
} else { // failed to seek to the WAL version
// todo handle the case where offset has been deleted in WAL, due to stream computing too slow
tqDebug("s-task:%s data in WAL are all consumed, try data in submit msg", pTask->id.idStr);
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
}
} else {
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
}
} }
streamDataSubmitDestroy(pSubmit); streamDataSubmitDestroy(pSubmit);
taosFreeQitem(pSubmit); taosFreeQitem(pSubmit);
#endif
tqStartStreamTasks(pTq);
return 0; return 0;
} }
...@@ -1131,29 +1119,31 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1131,29 +1119,31 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == ALL_STREAM_TASKS_ID) { // all tasks are restored from the wal if (taskId == WAL_READ_TASKS_ID) { // all tasks are extracted submit data from the wal
tqDoRestoreSourceStreamTasks(pTq); tqStreamTasksScanWal(pTq);
return 0; return 0;
} else { }
SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId);
if (pTask != NULL) {
if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
streamProcessRunReq(pTask);
} else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) {
tqDebug("vgId:%d s-task:%s start to process in restore procedure from last chk point:%" PRId64, vgId,
pTask->id.idStr, pTask->chkInfo.version);
streamProcessRunReq(pTask);
} else {
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask); SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId);
return 0; if (pTask != NULL) {
if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
streamProcessRunReq(pTask);
} else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) {
tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
pTask->id.idStr, pTask->chkInfo.version);
streamProcessRunReq(pTask);
} else { } else {
tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
return -1;
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq);
return 0;
} else {
tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
return -1;
} }
} }
...@@ -1165,14 +1155,10 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -1165,14 +1155,10 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp, exec); streamProcessDispatchReq(pTask, &req, &rsp, exec);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
...@@ -1294,26 +1280,39 @@ FAIL: ...@@ -1294,26 +1280,39 @@ FAIL:
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
int32_t tqRestoreStreamTasks(STQ* pTq) { int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
pMeta->walScan += 1;
if (pMeta->walScan > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScan);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno)); tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno));
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1; return -1;
} }
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks); int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
tqInfo("vgId:%d start restoring stream tasks, total tasks:%d", vgId, numOfTasks);
tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
initOffsetForAllRestoreTasks(pTq); initOffsetForAllRestoreTasks(pTq);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
pRunReq->taskId = ALL_STREAM_TASKS_ID; pRunReq->taskId = WAL_READ_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0; return 0;
} }
...@@ -322,16 +322,19 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v ...@@ -322,16 +322,19 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
} }
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks));
// push data for stream processing: // push data for stream processing:
// 1. the vnode isn't in the restore procedure. // 1. the vnode has already been restored.
// 2. the vnode should be the leader. // 2. the vnode should be the leader.
// 3. the stream is not suspended yet. // 3. the stream is not suspended yet.
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && (!pTq->pVnode->restored)) { if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
return 0; return 0;
} }
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
#if 0
void* data = taosMemoryMalloc(len); void* data = taosMemoryMalloc(len);
if (data == NULL) { if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
...@@ -343,7 +346,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v ...@@ -343,7 +346,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
memcpy(data, pReq, len); memcpy(data, pReq, len);
SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver}; SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver};
tqDebug("tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", data, len, ver, pReq); tqDebug("vgId:%d tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", vgId, data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit);
#endif
SPackedData submit = {0};
tqProcessSubmitReq(pTq, submit); tqProcessSubmitReq(pTq, submit);
} }
......
...@@ -300,6 +300,28 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { ...@@ -300,6 +300,28 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
return 0; return 0;
} }
int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) {
if (walNextValidMsg(pReader) < 0) {
return -1;
}
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pHead->head.version;
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return -1;
}
memcpy(data, pBody, len);
*pPackedData = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
return 0;
}
void tqNextBlock(STqReader* pReader, SFetchRet* ret) { void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (1) { while (1) {
if (pReader->msg2.msgStr == NULL) { if (pReader->msg2.msgStr == NULL) {
...@@ -434,7 +456,10 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD ...@@ -434,7 +456,10 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++; pReader->nextBlk++;
if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; if (pSubmitTbDataRet) {
*pSubmitTbDataRet = pSubmitTbData;
}
int32_t sversion = pSubmitTbData->sver; int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid; int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid; int64_t uid = pSubmitTbData->uid;
......
...@@ -15,60 +15,81 @@ ...@@ -15,60 +15,81 @@
#include "tq.h" #include "tq.h"
static int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList); static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList); static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
// this function should be executed by stream threads. // this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure // there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually. // will not stop eventually.
int tqDoRestoreSourceStreamTasks(STQ* pTq) { int tqStreamTasksScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (1) { while (1) {
SArray* pTaskList = taosArrayInit(4, POINTER_BYTES); tqInfo("vgId:%d continue check if data in wal are available", vgId);
// check all restore tasks // check all restore tasks
restoreStreamTaskImpl(pTq->pStreamMeta, pTq->pOffsetStore, pTaskList); bool allFull = true;
transferToNormalTask(pTq->pStreamMeta, pTaskList); streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull);
taosArrayDestroy(pTaskList);
int32_t numOfRestored = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks); int32_t times = 0;
if (numOfRestored <= 0) {
break;
}
}
int64_t et = taosGetTimestampMs();
tqInfo("vgId:%d restoring task completed, elapsed time:%" PRId64 " sec.", TD_VID(pTq->pVnode), (et - st));
return 0;
}
int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { if (allFull) {
int32_t numOfTask = taosArrayGetSize(pTaskList); taosWLockLatch(&pMeta->lock);
if (numOfTask <= 0) { pMeta->walScan -= 1;
return TSDB_CODE_SUCCESS; times = pMeta->walScan;
}
// todo: add lock if (pMeta->walScan <= 0) {
for (int32_t i = 0; i < numOfTask; ++i) { taosWUnLockLatch(&pMeta->lock);
SStreamTask* pTask = taosArrayGetP(pTaskList, i); break;
tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64, }
pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id);
taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
// NOTE: do not change the following order taosWUnLockLatch(&pMeta->lock);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times);
taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); }
} }
return TSDB_CODE_SUCCESS; double el = (taosGetTimestampMs() - st) / 1000.0;
tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el);
// restore wal scan flag
// atomic_store_8(&pTq->pStreamMeta->walScan, 0);
return 0;
} }
int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList) { //int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
// check all restore tasks // int32_t numOfTask = taosArrayGetSize(pTaskList);
void* pIter = NULL; // if (numOfTask <= 0) {
// return TSDB_CODE_SUCCESS;
// }
//
// // todo: add lock
// for (int32_t i = 0; i < numOfTask; ++i) {
// SStreamTask* pTask = taosArrayGetP(pTaskList, i);
// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64,
// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id);
// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
//
// // NOTE: do not change the following order
// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
// }
//
// return TSDB_CODE_SUCCESS;
//}
int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
void* pIter = NULL;
int32_t vgId = pStreamMeta->vgId;
*pScanIdle = true;
bool allWalChecked = true;
tqDebug("vgId:%d start to check wal to extract new submit block", vgId);
while (1) { while (1) {
pIter = taosHashIterate(pStreamMeta->pRestoreTasks, pIter); pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
} }
...@@ -78,8 +99,10 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS ...@@ -78,8 +99,10 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
continue; continue;
} }
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE ||
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus);
continue; continue;
} }
...@@ -88,41 +111,57 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS ...@@ -88,41 +111,57 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
if (tInputQueueIsFull(pTask)) { if (tInputQueueIsFull(pTask)) {
tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
taosMsleep(10);
continue; continue;
} }
*pScanIdle = false;
// check if offset value exists // check if offset value exists
STqOffset* pOffset = tqOffsetRead(pOffsetStore, key); STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
if (pOffset != NULL) { ASSERT(pOffset != NULL);
// seek the stored version and extract data from WAL
int32_t code = tqSeekVer(pTask->exec.pTqReader, pOffset->val.version, ""); // seek the stored version and extract data from WAL
if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
// append the data for the stream if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
SFetchRet ret = {.data.info.type = STREAM_NORMAL}; continue;
terrno = 0; }
tqNextBlock(pTask->exec.pTqReader, &ret); // append the data for the stream
if (ret.fetchType == FETCH_TYPE__DATA) { tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr);
code = launchTaskForWalBlock(pTask, &ret, pOffset);
if (code != TSDB_CODE_SUCCESS) { SPackedData packData = {0};
continue; code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
} if (code != TSDB_CODE_SUCCESS) { // failed, continue
} else { continue;
// FETCH_TYPE__NONE: all data has been retrieved from WAL, let's try submit block directly. }
tqDebug("s-task:%s data in WAL are all consumed, transfer this task to be normal state", pTask->id.idStr);
taosArrayPush(pTaskList, &pTask); SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT);
} if (p == NULL) {
} else { // failed to seek to the WAL version terrno = TSDB_CODE_OUT_OF_MEMORY;
tqDebug("s-task:%s data in WAL are all consumed, transfer this task to be normal state", pTask->id.idStr); tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
taosArrayPush(pTaskList, &pTask); continue;
} }
allWalChecked = false;
tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
if (code == TSDB_CODE_SUCCESS) {
pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pOffset->val.version);
} else { } else {
ASSERT(0); // do nothing
} }
streamDataSubmitDestroy(p);
taosFreeQitem(p);
} }
if (allWalChecked) {
*pScanIdle = true;
}
return 0; return 0;
} }
...@@ -35,7 +35,7 @@ void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { ...@@ -35,7 +35,7 @@ void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) {
} }
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
int32_t code = tAppendDataForStream(pTask, pQueueItem); int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) { if (code < 0) {
tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver);
return -1; return -1;
...@@ -79,7 +79,7 @@ void initOffsetForAllRestoreTasks(STQ* pTq) { ...@@ -79,7 +79,7 @@ void initOffsetForAllRestoreTasks(STQ* pTq) {
void* pIter = NULL; void* pIter = NULL;
while(1) { while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pRestoreTasks, pIter); pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
} }
...@@ -103,7 +103,6 @@ void initOffsetForAllRestoreTasks(STQ* pTq) { ...@@ -103,7 +103,6 @@ void initOffsetForAllRestoreTasks(STQ* pTq) {
doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version); doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version);
} }
} }
} }
void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
......
...@@ -539,13 +539,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -539,13 +539,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetBatchMeta(pVnode, pMsg); return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_TMQ_CONSUME: case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
#if 1
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
#endif
case TDMT_STREAM_TASK_CHECK: case TDMT_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP: case TDMT_STREAM_TASK_DISPATCH_RSP:
......
...@@ -551,7 +551,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) ...@@ -551,7 +551,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
// start to restore all stream tasks // start to restore all stream tasks
tqRestoreStreamTasks(pVnode->pTq); tqStartStreamTasks(pVnode->pTq);
} }
static void vnodeBecomeFollower(const SSyncFSM *pFsm) { static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
......
...@@ -70,7 +70,7 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -70,7 +70,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
trigger->pBlock->info.type = STREAM_GET_ALL; trigger->pBlock->info.type = STREAM_GET_ALL;
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
taosFreeQitem(trigger); taosFreeQitem(trigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
return; return;
...@@ -110,16 +110,17 @@ int32_t streamSchedExec(SStreamTask* pTask) { ...@@ -110,16 +110,17 @@ int32_t streamSchedExec(SStreamTask* pTask) {
SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) }; SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) };
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
qDebug("trigger to run s-task:%s", pTask->id.idStr);
} }
return 0; return 0;
} }
int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status; int8_t status;
// enqueue // enqueue data block
if (pData != NULL) { if (pData != NULL) {
pData->type = STREAM_INPUT__DATA_BLOCK; pData->type = STREAM_INPUT__DATA_BLOCK;
pData->srcVgId = pReq->dataSrcVgId; pData->srcVgId = pReq->dataSrcVgId;
...@@ -127,10 +128,10 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR ...@@ -127,10 +128,10 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
/*pData->blocks = pReq->data;*/ /*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/ /*pBlock->sourceVer = pReq->sourceVer;*/
streamDispatchReqToData(pReq, pData); streamDispatchReqToData(pReq, pData);
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL; status = TASK_INPUT_STATUS__NORMAL;
} else { } else { // input queue is full, upstream is blocked now
status = TASK_INPUT_STATUS__FAILED; status = TASK_INPUT_STATUS__BLOCKED;
} }
} else { } else {
streamTaskInputFail(pTask); streamTaskInputFail(pTask);
...@@ -148,8 +149,10 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR ...@@ -148,8 +149,10 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
pCont->downstreamNodeId = htonl(pTask->nodeId); pCont->downstreamNodeId = htonl(pTask->nodeId);
pCont->downstreamTaskId = htonl(pTask->id.taskId); pCont->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf; pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
} }
...@@ -168,7 +171,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, ...@@ -168,7 +171,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
/*pData->blocks = pReq->data;*/ /*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/ /*pBlock->sourceVer = pReq->sourceVer;*/
streamRetrieveReqToData(pReq, pData); streamRetrieveReqToData(pReq, pData);
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL; status = TASK_INPUT_STATUS__NORMAL;
} else { } else {
status = TASK_INPUT_STATUS__FAILED; status = TASK_INPUT_STATUS__FAILED;
...@@ -209,10 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { ...@@ -209,10 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
} }
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("task %d receive dispatch req from node %d task %d", pTask->id.taskId, pReq->upstreamNodeId, qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr,
pReq->upstreamTaskId); pReq->upstreamTaskId);
streamTaskEnqueue(pTask, pReq, pRsp); streamTaskEnqueueBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq); tDeleteStreamDispatchReq(pReq);
if (exec) { if (exec) {
...@@ -232,12 +235,14 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -232,12 +235,14 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
qDebug("task %d receive dispatch rsp, code: %x", pTask->id.taskId, code); qDebug("s-task:%s receive dispatch rsp, code: %x", pTask->id.idStr, code);
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp); qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp);
if (leftRsp > 0) return 0; if (leftRsp > 0) {
return 0;
}
} }
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
...@@ -282,7 +287,7 @@ bool tInputQueueIsFull(const SStreamTask* pTask) { ...@@ -282,7 +287,7 @@ bool tInputQueueIsFull(const SStreamTask* pTask) {
return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY;
} }
int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type; int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
...@@ -295,12 +300,12 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -295,12 +300,12 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) {
} }
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
qDebug("s-task:%s submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr,
pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
pSubmitBlock->submit.ver, total); pSubmitBlock->submit.ver, total);
if (total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) {
qDebug("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY);
streamDataSubmitDestroy(pSubmitBlock); streamDataSubmitDestroy(pSubmitBlock);
return -1; return -1;
} }
...@@ -309,8 +314,8 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -309,8 +314,8 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) {
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
if (total > 2) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) {
qDebug("stream task input queue is full, abort"); qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY);
return -1; return -1;
} }
...@@ -327,7 +332,6 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -327,7 +332,6 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) {
} }
#if 0 #if 0
// TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif #endif
......
...@@ -69,7 +69,6 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock ...@@ -69,7 +69,6 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
if (pDataSubmit == NULL) { if (pDataSubmit == NULL) {
return NULL; return NULL;
} }
......
...@@ -238,7 +238,8 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* ...@@ -238,7 +238,8 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq*
msg.pCont = buf; msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECK; msg.msgType = TDMT_STREAM_TASK_CHECK;
qDebug("dispatch from task %d to task %d node %d: check msg", pTask->id.taskId, pReq->downstreamTaskId, nodeId); qDebug("dispatch from s-task:%s to downstream s-task:%"PRIx64":%d node %d: check msg", pTask->id.idStr,
pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
...@@ -319,8 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p ...@@ -319,8 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p
msg.pCont = buf; msg.pCont = buf;
msg.msgType = pTask->dispatchMsgType; msg.msgType = pTask->dispatchMsgType;
qDebug("dispatch from task %d to task %d node %d: data msg", pTask->id.taskId, pReq->taskId, vgId); qDebug("dispatch from s-task:%s to taskId:%d vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
code = 0; code = 0;
...@@ -402,14 +402,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -402,14 +402,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
goto FAIL_FIXED_DISPATCH; goto FAIL_FIXED_DISPATCH;
} }
} }
int32_t vgId = pTask->fixedEpDispatcher.nodeId; int32_t vgId = pTask->fixedEpDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
req.taskId = downstreamTaskId; req.taskId = downstreamTaskId;
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->id.taskId, pTask->selfChildId, qDebug("s-task:%s (child taskId:%d) dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
downstreamTaskId, vgId); pTask->selfChildId, blockNum, downstreamTaskId, vgId);
if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) {
goto FAIL_FIXED_DISPATCH; goto FAIL_FIXED_DISPATCH;
...@@ -494,6 +495,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -494,6 +495,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
int32_t streamDispatch(SStreamTask* pTask) { int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
qDebug("s-task:%s try to dispatch intermediate result block to downstream, numofBlocks in outputQ:%d", pTask->id.idStr,
taosQueueItemSize(pTask->outputQueue->queue));
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
...@@ -503,13 +506,12 @@ int32_t streamDispatch(SStreamTask* pTask) { ...@@ -503,13 +506,12 @@ int32_t streamDispatch(SStreamTask* pTask) {
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) { if (pBlock == NULL) {
qDebug("stream stop dispatching since no output: task %d", pTask->id.taskId); qDebug("s-task:%s stream stop dispatching since no output in output queue", pTask->id.idStr);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0; return 0;
} }
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qDebug("stream dispatching: task %d", pTask->id.taskId); ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
int32_t code = 0; int32_t code = 0;
if (streamDispatchAllBlocks(pTask, pBlock) < 0) { if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
...@@ -518,6 +520,7 @@ int32_t streamDispatch(SStreamTask* pTask) { ...@@ -518,6 +520,7 @@ int32_t streamDispatch(SStreamTask* pTask) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
goto FREE; goto FREE;
} }
FREE: FREE:
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
......
...@@ -40,9 +40,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -40,9 +40,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qDebug("s-task:%s set submit blocks as input %p %p %d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr,
pSubmit->submit.msgLen, pSubmit->submit.ver);
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr,
pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
...@@ -241,7 +241,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -241,7 +241,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
qDebug("s-task:%s stream task exec over, queue empty", pTask->id.idStr); // qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr);
break; break;
} }
...@@ -280,12 +280,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -280,12 +280,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pTask->taskLevel == TASK_LEVEL__SINK) { if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK); ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
streamTaskOutput(pTask, pInput); streamTaskOutput(pTask, pInput);
continue; continue;
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("s-task:%s exec begin, msg batch: %d", pTask->id.idStr, batchSize); qDebug("s-task:%s exec begin, numOfBlocks:%d", pTask->id.idStr, batchSize);
streamTaskExecImpl(pTask, pInput, pRes); streamTaskExecImpl(pTask, pInput, pRes);
...@@ -293,13 +294,21 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -293,13 +294,21 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t dataVer = 0; int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
if (dataVer > pTask->chkInfo.version) { // save it since the checkpoint is updated if (dataVer > pTask->chkInfo.version) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer); qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId}; pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId};
streamMetaSaveTask(pTask->pMeta, pTask);
taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) { if (streamMetaCommit(pTask->pMeta) < 0) {
qError("failed to commit stream meta, since %s", terrstr()); taosWUnLockLatch(&pTask->pMeta->lock);
qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
return -1; return -1;
} else {
taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
} }
} else { } else {
qDebug("s-task:%s exec end", pTask->id.idStr); qDebug("s-task:%s exec end", pTask->id.idStr);
...@@ -354,6 +363,7 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -354,6 +363,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here // todo the task should be commit here
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr);
if (!taosQueueEmpty(pTask->inputQueue->queue)) { if (!taosQueueEmpty(pTask->inputQueue->queue)) {
streamSchedExec(pTask); streamSchedExec(pTask);
......
...@@ -51,8 +51,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -51,8 +51,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
pMeta->pRestoreTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); pMeta->pWalReadTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK);
if (pMeta->pRestoreTasks == NULL) { if (pMeta->pWalReadTasks == NULL) {
goto _err; goto _err;
} }
...@@ -60,15 +60,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -60,15 +60,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
pMeta->vgId = vgId;
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
taosInitRWLatch(&pMeta->lock);
return pMeta; return pMeta;
_err: _err:
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pRestoreTasks) taosHashCleanup(pMeta->pRestoreTasks); if (pMeta->pWalReadTasks) taosHashCleanup(pMeta->pWalReadTasks);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db); if (pMeta->db) tdbClose(pMeta->db);
...@@ -83,20 +84,29 @@ void streamMetaClose(SStreamMeta* pMeta) { ...@@ -83,20 +84,29 @@ void streamMetaClose(SStreamMeta* pMeta) {
tdbClose(pMeta->db); tdbClose(pMeta->db);
void* pIter = NULL; void* pIter = NULL;
while(pMeta->walScan) {
qDebug("wait stream daemon quit");
taosMsleep(100);
}
while (1) { while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter); pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->timer) { if (pTask->timer) {
taosTmrStop(pTask->timer); taosTmrStop(pTask->timer);
pTask->timer = NULL; pTask->timer = NULL;
} }
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
/*streamMetaReleaseTask(pMeta, pTask);*/ /*streamMetaReleaseTask(pMeta, pTask);*/
} }
taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTasks);
taosHashCleanup(pMeta->pRestoreTasks); taosHashCleanup(pMeta->pWalReadTasks);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
} }
...@@ -164,8 +174,8 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -164,8 +174,8 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0; return 0;
} }
#if 1 // add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
return -1; return -1;
} }
...@@ -174,10 +184,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { ...@@ -174,10 +184,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
return -1; return -1;
} }
taosHashPut(pMeta->pRestoreTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); pTask->status.taskStatus = STREAM_STATUS__NORMAL;
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
return 0; return 0;
} }
#endif
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
int32_t numOfReady = taosHashGetSize(pMeta->pTasks);
int32_t numOfRestoring = taosHashGetSize(pMeta->pWalReadTasks);
return numOfReady + numOfRestoring;
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
...@@ -206,9 +222,9 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { ...@@ -206,9 +222,9 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t numOfRestored = taosHashGetSize(pMeta->pRestoreTasks); int32_t numOfRestored = taosHashGetSize(pMeta->pWalReadTasks);
if (numOfRestored > 0) { if (numOfRestored > 0) {
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pRestoreTasks, &taskId, sizeof(int32_t)); SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pWalReadTasks, &taskId, sizeof(taskId));
if (p != NULL) { if (p != NULL) {
pTask = *p; pTask = *p;
if (pTask != NULL && (atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING)) { if (pTask != NULL && (atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING)) {
...@@ -217,15 +233,15 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { ...@@ -217,15 +233,15 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
return pTask; return pTask;
} }
} }
} else { }
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (p != NULL) { SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
pTask = *p; if (p != NULL) {
if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) { pTask = *p;
atomic_add_fetch_32(&pTask->refCnt, 1); if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) {
taosRUnLockLatch(&pMeta->lock); atomic_add_fetch_32(&pTask->refCnt, 1);
return pTask; taosRUnLockLatch(&pMeta->lock);
} return pTask;
} }
} }
...@@ -261,9 +277,12 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { ...@@ -261,9 +277,12 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
int32_t streamMetaCommit(SStreamMeta* pMeta) { int32_t streamMetaCommit(SStreamMeta* pMeta) {
if (tdbCommit(pMeta->db, pMeta->txn) < 0) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
ASSERT(0);
return -1; return -1;
} }
if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
ASSERT(0);
return -1; return -1;
} }
...@@ -319,7 +338,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -319,7 +338,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1; return -1;
} }
if (taosHashPut(pMeta->pRestoreTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { if (taosHashPut(pMeta->pWalReadTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
......
...@@ -186,7 +186,8 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -186,7 +186,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->exec.pExecutor = NULL; pTask->exec.pExecutor = NULL;
} }
if (pTask->exec.pTqReader != NULL) { if (pTask->exec.pTqReader != NULL && pTask->freeFp != NULL) {
pTask->freeFp(pTask->exec.pTqReader);
pTask->exec.pTqReader = NULL; pTask->exec.pTqReader = NULL;
} }
...@@ -206,5 +207,9 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -206,5 +207,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
streamStateClose(pTask->pState); streamStateClose(pTask->pState);
} }
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);
}
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }
...@@ -248,7 +248,8 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem ...@@ -248,7 +248,8 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
} }
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers)); int32_t numOfThreads = taosArrayGetSize(pool->workers);
uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum);
curWorkerNum++; curWorkerNum++;
} }
......
...@@ -37,7 +37,7 @@ if $loop_count == 20 then ...@@ -37,7 +37,7 @@ if $loop_count == 20 then
endi endi
if $rows != 4 then if $rows != 4 then
print =====rows=$rows print =====rows=$rows, expect 4
goto loop0 goto loop0
endi endi
...@@ -53,7 +53,7 @@ if $data02 != 2 then ...@@ -53,7 +53,7 @@ if $data02 != 2 then
endi endi
if $data03 != 5 then if $data03 != 5 then
print =====data03=$data03 print =====data03=$data03, expect:5
goto loop0 goto loop0
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册