提交 eb0e1f84 编写于 作者: H Haojun Liao

fix(stream): remove unused tqreader, do some internal refactor, set the meta value for streamtask.

上级 f8b672f0
......@@ -123,7 +123,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
* @param isAdd
* @return
*/
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList);
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd);
/**
* Create the exec task object according to task json
......
......@@ -221,7 +221,6 @@ SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);
typedef struct {
char* qmsg;
void* pExecutor; // not applicable to encoder and decoder
struct STqReader* pTqReader; // not applicable to encoder and decoder
struct SWalReader* pWalReader; // not applicable to encoder and decoder
} STaskExec;
......@@ -333,7 +332,6 @@ struct SStreamTask {
int64_t checkpointingId;
int32_t checkpointAlignCnt;
struct SStreamMeta* pMeta;
_free_reader_fn_t freeFp;
};
// meta
......@@ -343,7 +341,6 @@ typedef struct SStreamMeta {
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pWalReadTasks;
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
......
......@@ -55,6 +55,7 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
smClose(pMgmt);
return -1;
}
tmsgReportStartup("snode-impl", "initialized");
if (smStartWorker(pMgmt) != 0) {
......
......@@ -356,6 +356,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
qDestroyQueryPlan(pPlan);
return -1;
}
pInnerTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
......
......@@ -32,6 +32,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder);
goto FAIL;
}
tDecoderClear(&decoder);
int32_t taskId = req.taskId;
......@@ -77,6 +78,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb;
pTask->chkInfo.version = ver;
pTask->pMeta = pSnode->pMeta;
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
......@@ -137,6 +139,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
code = tDecodeStreamTask(&decoder, pTask);
......
......@@ -168,7 +168,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) {
if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd, NULL)) < 0) {
if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
terrstr());
......
......@@ -567,6 +567,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta;
pTask->chkInfo.version = ver;
// expand executor
if (pTask->fillHistory) {
......@@ -628,18 +629,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->exec.pTqReader = tqOpenReader(pTq->pVnode);
if (pTask->exec.pTqReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pTask->freeFp = (_free_reader_fn_t)tqCloseReader;
SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor);
tqReaderAddTbUidList(pTask->exec.pTqReader, pList);
taosArrayDestroy(pList);
}
streamSetupTrigger(pTask);
......@@ -1141,7 +1131,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq);
return 0;
} else {
......
......@@ -973,7 +973,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
STqHandle* pTqHandle = (STqHandle*)pIter;
if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd, NULL);
int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
if (code != 0) {
tqError("update qualified table error for %s", pTqHandle->subKey);
continue;
......@@ -1031,18 +1031,11 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
SArray* pList = NULL;
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd, pList);
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
if (code != 0) {
tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
continue;
}
if (isAdd) { // only add qualified tables
tqReaderAddTbUidList(pTask->exec.pTqReader, pList);
} else {
tqReaderRemoveTbUidList(pTask->exec.pTqReader, tbUidList);
}
}
}
......
......@@ -49,32 +49,6 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI
return TSDB_CODE_SUCCESS;
}
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset) {
SStreamDataBlock* pBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (pBlocks == NULL) { // failed, do nothing
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pRet->data.info.type = STREAM_NORMAL;
pBlocks->type = STREAM_INPUT__DATA_BLOCK;
pBlocks->sourceVer = pOffset->val.version;
pBlocks->blocks = taosArrayInit(0, sizeof(SSDataBlock));
taosArrayPush(pBlocks->blocks, &pRet->data);
// int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData);
// tqDebug("-----------%ld\n", ts[0]);
int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pBlocks, pBlocks->sourceVer);
if (code == TSDB_CODE_SUCCESS) {
pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pTqReader->pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pOffset->val.version);
}
return 0;
}
void initOffsetForAllRestoreTasks(STQ* pTq) {
void* pIter = NULL;
......@@ -90,8 +64,7 @@ void initOffsetForAllRestoreTasks(STQ* pTq) {
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->status.taskStatus);
tqDebug("s-task:%s skip push data, since not ready, status %d", pTask->id.idStr, pTask->status.taskStatus);
continue;
}
......@@ -120,7 +93,7 @@ void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus);
continue;
}
......
......@@ -370,7 +370,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
return qa;
}
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList) {
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
const char* id = GET_TASKID(pTaskInfo);
int32_t code = 0;
......@@ -386,11 +386,6 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
if (isAdd) { // add new table id
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
int32_t numOfQualifiedTables = taosArrayGetSize(qa);
if (pList != NULL) {
taosArrayAddAll(pList, qa);
}
qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -57,11 +57,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
pMeta->pWalReadTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK);
if (pMeta->pWalReadTasks == NULL) {
goto _err;
}
if (streamMetaBegin(pMeta) < 0) {
goto _err;
}
......@@ -75,7 +70,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
_err:
taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pWalReadTasks) taosHashCleanup(pMeta->pWalReadTasks);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
......@@ -112,7 +106,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup(pMeta->pTasks);
taosHashCleanup(pMeta->pWalReadTasks);
taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta);
}
......@@ -196,9 +189,7 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
}
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
int32_t numOfReady = taosHashGetSize(pMeta->pTasks);
int32_t numOfRestoring = taosHashGetSize(pMeta->pWalReadTasks);
return numOfReady + numOfRestoring;
return (int32_t) taosHashGetSize(pMeta->pTasks);
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
......@@ -225,34 +216,19 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
}
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
SStreamTask* pTask = NULL;
int32_t numOfRestored = taosHashGetSize(pMeta->pWalReadTasks);
if (numOfRestored > 0) {
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pWalReadTasks, &taskId, sizeof(taskId));
if (p != NULL) {
pTask = *p;
if (pTask != NULL && (atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING)) {
atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return pTask;
}
}
}
taosRLockLatch(&pMeta->lock);
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (p != NULL) {
pTask = *p;
if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) {
if ((*p) != NULL && atomic_load_8(&((*p)->status.taskStatus)) != TASK_STATUS__DROPPING) {
pTask = *p;
atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return pTask;
}
}
taosRUnLockLatch(&pMeta->lock);
return NULL;
return pTask;
}
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
......@@ -344,7 +320,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1;
}
if (taosHashPut(pMeta->pWalReadTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
......
......@@ -17,6 +17,7 @@
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
streamSetParamForRecover(pTask);
......@@ -33,12 +34,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
memcpy(serializedReq, &req, len);
SRpcMsg rpcMsg = {
.contLen = len,
.pCont = serializedReq,
.msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE,
};
SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE };
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
/*ASSERT(0);*/
}
......@@ -61,6 +57,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
.upstreamNodeId = pTask->nodeId,
.childId = pTask->selfChildId,
};
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = tGenIdPI64();
......
......@@ -187,11 +187,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->exec.pExecutor = NULL;
}
if (pTask->exec.pTqReader != NULL && pTask->freeFp != NULL) {
pTask->freeFp(pTask->exec.pTqReader);
pTask->exec.pTqReader = NULL;
}
if (pTask->exec.pWalReader != NULL) {
walCloseReader(pTask->exec.pWalReader);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册