提交 06cf3588 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 6a3c935b
......@@ -47,7 +47,6 @@ enum {
TASK_STATUS__WAIT_DOWNSTREAM,
TASK_STATUS__RECOVER_PREPARE,
TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2,
TASK_STATUS__PAUSE,
};
......@@ -203,7 +202,7 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
void* streamQueueNextItem(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
......@@ -251,7 +250,7 @@ typedef struct {
int8_t reserved;
} STaskSinkFetch;
typedef struct {
typedef struct SStreamChildEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
......@@ -276,32 +275,38 @@ typedef struct SStreamStatus {
int8_t keepTaskStatus;
} SStreamStatus;
typedef struct SHistoryDataRange {
typedef struct SHistDataRange {
SVersionRange range;
STimeWindow window;
} SHistoryDataRange;
} SHistDataRange;
struct SStreamTask {
SStreamId id;
typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id
SEpSet epSet;
int32_t selfChildId;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
int32_t nodeId; // vgroup id
SEpSet epSet;
SCheckpointInfo chkInfo;
STaskExec exec;
int8_t fillHistory; // fill history
int8_t fillHistory; // is fill history task or not
} SSTaskBasicInfo;
SHistoryDataRange dataRange;
SStreamId historyTaskId;
typedef struct SDispatchMsgInfo {
void* pData; // current dispatch data
int16_t msgType; // dispatch msg type
} SDispatchMsgInfo;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
struct SStreamTask {
SStreamId id;
SSTaskBasicInfo info;
int8_t outputType;
SDispatchMsgInfo msgInfo;
SStreamStatus status;
SCheckpointInfo chkInfo;
STaskExec exec;
SHistDataRange dataRange;
SStreamId historyTaskId;
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
// output
union {
......@@ -326,7 +331,7 @@ struct SStreamTask {
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int32_t numOfWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
......
......@@ -110,7 +110,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
return -1;
}
......@@ -131,7 +131,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
for (int32_t j = 0; j < numOfSinkNodes; j++) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
if (pSinkTask->nodeId == pVgInfo->vgId) {
if (pSinkTask->info.nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pSinkTask->id.taskId;
break;
}
......@@ -148,11 +148,11 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pTask->info.nodeId = pVgroup->vgId;
pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
plan->execNode.nodeId = pTask->nodeId;
plan->execNode.epSet = pTask->epSet;
plan->execNode.nodeId = pTask->info.nodeId;
plan->execNode.epSet = pTask->info.epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
......@@ -172,11 +172,11 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
int32_t msgLen;
pTask->nodeId = SNODE_HANDLE;
pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);
pTask->info.nodeId = SNODE_HANDLE;
pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
plan->execNode.nodeId = SNODE_HANDLE;
plan->execNode.epSet = pTask->epSet;
plan->execNode.epSet = pTask->info.epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
......@@ -232,8 +232,8 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p
return -1;
}
pTask->nodeId = vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pTask->info.nodeId = vgId;
pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
mndSetSinkTaskInfo(pStream, pTask);
return 0;
}
......@@ -273,9 +273,9 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
return NULL;
}
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->childId = pTask->info.selfChildId;
pEpInfo->epSet = pTask->info.epSet;
pEpInfo->nodeId = pTask->info.nodeId;
pEpInfo->taskId = pTask->id.taskId;
return pEpInfo;
......@@ -284,11 +284,11 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher;
pDispatcher->taskId = pTask->id.taskId;
pDispatcher->nodeId = pTask->nodeId;
pDispatcher->epSet = pTask->epSet;
pDispatcher->nodeId = pTask->info.nodeId;
pDispatcher->epSet = pTask->info.epSet;
pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
}
int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
......@@ -297,11 +297,11 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if(pDownstream->childEpInfo == NULL) {
pDownstream->childEpInfo = taosArrayInit(4, POINTER_BYTES);
if(pDownstream->pUpstreamEpInfoList == NULL) {
pDownstream->pUpstreamEpInfoList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pDownstream->childEpInfo, &pEpInfo);
taosArrayPush(pDownstream->pUpstreamEpInfoList, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
......
......@@ -444,7 +444,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
return -1;
}
((SMsgHead *)buf)->vgId = htonl(pTask->nodeId);
((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, size);
......@@ -454,7 +454,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
STransAction action = {0};
action.mTraceId = pTrans->mTraceId;
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_STREAM_TASK_DEPLOY;
......@@ -637,17 +637,17 @@ _OVER:
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
// vnode
/*if (pTask->nodeId > 0) {*/
/*if (pTask->info.nodeId > 0) {*/
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_DROP;
......@@ -870,7 +870,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
SMStreamDoCheckpointMsg *pMsg) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = pMsg->checkpointId;
req.nodeId = pTask->nodeId;
req.nodeId = pTask->info.nodeId;
req.expireTime = -1;
req.streamId = pTask->streamId;
req.taskId = pTask->taskId;
......@@ -899,7 +899,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pTask->nodeId);
pMsgHead->vgId = htonl(pTask->info.nodeId);
tEncoderClear(&encoder);
......@@ -938,12 +938,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
/*A(pTask->nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosRUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
......@@ -1262,7 +1262,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
varDataSetLen(nodeType, 5);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->nodeId > 0) {
if (pTask->info.nodeId > 0) {
memcpy(varDataVal(nodeType), "vnode", 5);
} else {
memcpy(varDataVal(nodeType), "snode", 5);
......@@ -1271,21 +1271,21 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// node id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int64_t nodeId = TMAX(pTask->nodeId, 0);
int64_t nodeId = TMAX(pTask->info.nodeId, 0);
colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
// level
char level[20 + VARSTR_HEADER_SIZE] = {0};
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
memcpy(varDataVal(level), "source", 6);
varDataSetLen(level, 6);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
memcpy(varDataVal(level), "agg", 3);
varDataSetLen(level, 3);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4);
varDataSetLen(level, 4);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false);
......@@ -1323,10 +1323,10 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVPauseStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_PAUSE;
......@@ -1344,7 +1344,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
}
......@@ -1446,11 +1446,11 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->igUntreated = igUntreated;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVResumeStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_RESUME;
......@@ -1468,7 +1468,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}
}
......
......@@ -62,7 +62,7 @@ FAIL:
}
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->childEpInfo) != 0);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0);
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
......@@ -85,7 +85,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
return -1;
}
int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamEpInfoList);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
initStreamStateAPI(&handle.api);
......@@ -151,7 +151,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
}
tDecoderClear(&decoder);
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
// 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
......@@ -164,7 +164,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
taosWUnLockLatch(&pSnode->pMeta->lock);
// 3.go through recover steps to fill history
if (pTask->fillHistory) {
if (pTask->info.fillHistory) {
streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask);
}
......
......@@ -820,9 +820,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->dataRange.range.minVer = ver;
// expand executor
pTask->status.taskStatus = /*(pTask->fillHistory) ? */TASK_STATUS__WAIT_DOWNSTREAM /*: TASK_STATUS__NORMAL*/;
pTask->status.taskStatus = /*(pTask->info.fillHistory) ? */TASK_STATUS__WAIT_DOWNSTREAM /*: TASK_STATUS__NORMAL*/;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
......@@ -837,13 +837,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
initStorageAPI(&handle.api);
......@@ -879,7 +879,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
}
......@@ -887,7 +887,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
streamSetupTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId,
pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);
// next valid version will add one
pTask->chkInfo.version += 1;
......@@ -1028,7 +1028,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWUnLockLatch(&pStreamMeta->lock);
// 3. It's an fill history task, do nothing. wait for the main task to start it
if (pTask->fillHistory) {
if (pTask->info.fillHistory) {
tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
} else {
// calculate the correct start time window, and start the handle the history data for the main task.
......@@ -1037,7 +1037,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamTaskStartHistoryTask(pTask, sversion);
// launch current task
SHistoryDataRange* pRange = &pTask->dataRange;
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey;
int64_t ver = pRange->range.minVer;
......@@ -1093,7 +1093,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
if (pTask->fillHistory) {
if (pTask->info.fillHistory) {
// todo transfer the executor status, and then destroy this stream task
} else {
// todo update the chkInfo version for current task.
......@@ -1199,7 +1199,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1;
}
atomic_store_8(&pTask->fillHistory, 0);
atomic_store_8(&pTask->info.fillHistory, 0);
streamMetaSaveTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
......@@ -1392,7 +1392,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version
if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
......@@ -1403,7 +1403,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
tqStartStreamTasks(pTq);
} else {
streamSchedExec(pTask);
......
......@@ -1109,7 +1109,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
if (code != 0) {
tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
......
......@@ -129,8 +129,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
int32_t status = pTask->status.taskStatus;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel);
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->info.taskLevel);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......
......@@ -7,15 +7,9 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(qworker
PRIVATE os util transport nodes planner qcom executor index grant
)
ELSE ()
TARGET_LINK_LIBRARIES(qworker
PRIVATE os util transport nodes planner qcom executor index
)
ENDIF()
TARGET_LINK_LIBRARIES(qworker
PRIVATE os util transport nodes planner qcom executor index
)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
......
......@@ -40,8 +40,6 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
......
......@@ -110,7 +110,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
return -1;
}
pRunReq->head.vgId = pTask->nodeId;
pRunReq->head.vgId = pTask->info.nodeId;
pRunReq->streamId = pTask->id.streamId;
pRunReq->taskId = pTask->id.taskId;
......@@ -146,7 +146,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->nodeId);
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
......@@ -162,7 +162,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue
if (pData != NULL) {
qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->selfChildId,
qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->info.selfChildId,
pReq->srcTaskId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
......@@ -278,10 +278,10 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
}
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("s-task:%s receive retrieve req from node %d taskId:0x%x", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId);
qDebug("s-task:%s receive retrieve req from taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->srcTaskId, pReq->srcNodeId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK);
streamSchedExec(pTask);
return 0;
}
......@@ -299,7 +299,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
size);
......@@ -319,7 +319,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
size);
......@@ -350,19 +350,21 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
if (dequeueFlag == STREAM_QUEUE__FAILED) {
ASSERT(queue->qItem != NULL);
return streamQueueCurItem(queue);
void* streamQueueNextItem(SStreamQueue* pQueue) {
int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
if (flag == STREAM_QUEUE__FAILED) {
ASSERT(pQueue->qItem != NULL);
return streamQueueCurItem(pQueue);
} else {
queue->qItem = NULL;
taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall);
taosGetQitem(queue->qall, &queue->qItem);
pQueue->qItem = NULL;
taosGetQitem(pQueue->qall, &pQueue->qItem);
if (pQueue->qItem == NULL) {
taosReadAllQitems(pQueue->queue, pQueue->qall);
taosGetQitem(pQueue->qall, &pQueue->qItem);
}
return streamQueueCurItem(queue);
return streamQueueCurItem(pQueue);
}
}
......
......@@ -123,7 +123,7 @@ int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pR
static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) {
if (pTask->checkpointingId == 0) {
pTask->checkpointingId = checkpointId;
pTask->checkpointAlignCnt = taosArrayGetSize(pTask->childEpInfo);
pTask->checkpointAlignCnt = taosArrayGetSize(pTask->pUpstreamEpInfoList);
}
ASSERT(pTask->checkpointingId == checkpointId);
......@@ -165,7 +165,7 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre
int64_t checkpointId = pReq->checkpointId;
int32_t childId = pReq->childId;
if (taosArrayGetSize(pTask->childEpInfo) > 0) {
if (taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0) {
code = streamAlignCheckpoint(pTask, checkpointId, childId);
if (code > 0) {
return 0;
......
......@@ -64,11 +64,11 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->childId = pTask->selfChildId;
pStreamBlocks->childId = pTask->info.selfChildId;
pStreamBlocks->sourceVer = pSubmit->ver;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
pStreamBlocks->childId = pTask->selfChildId;
pStreamBlocks->childId = pTask->info.selfChildId;
pStreamBlocks->sourceVer = pMerged->ver;
}
......
......@@ -15,7 +15,9 @@
#include "streamInc.h"
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
......@@ -37,6 +39,37 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
return pEncoder->pos;
}
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
pRetrieve->watermark = htobe64(pBlock->info.watermark);
memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen);
taosArrayPush(pReq->data, &buf);
pReq->totalLen += dataStrLen;
return 0;
}
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
......@@ -125,17 +158,17 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
SStreamRetrieveReq req = {
.streamId = pTask->id.streamId,
.srcNodeId = pTask->nodeId,
.srcNodeId = pTask->info.nodeId,
.srcTaskId = pTask->id.taskId,
.pRetrieve = pRetrieve,
.retrieveLen = dataStrLen,
};
int32_t sz = taosArrayGetSize(pTask->childEpInfo);
int32_t sz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
ASSERT(sz > 0);
for (int32_t i = 0; i < sz; i++) {
req.reqId = tGenIdPI64();
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
req.dstNodeId = pEpInfo->nodeId;
req.dstTaskId = pEpInfo->taskId;
int32_t len;
......@@ -165,7 +198,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
buf = NULL;
qDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
}
code = 0;
......@@ -175,37 +208,6 @@ CLEAR:
return code;
}
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
pRetrieve->watermark = htobe64(pBlock->info.watermark);
memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen);
taosArrayPush(pReq->data, &buf);
pReq->totalLen += dataStrLen;
return 0;
}
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
......@@ -315,7 +317,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = pTask->dispatchMsgType;
msg.msgType = pTask->msgInfo.msgType;
qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
tmsgSendReq(pEpSet, &msg);
......@@ -383,12 +385,12 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
.streamId = pTask->id.streamId,
.dataSrcVgId = pData->srcVgId,
.upstreamTaskId = pTask->id.taskId,
.upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId,
.upstreamChildId = pTask->info.selfChildId,
.upstreamNodeId = pTask->info.nodeId,
.blockNum = numOfBlocks,
};
req.data = taosArrayInit(numOfBlocks, sizeof(void*));
req.data = taosArrayInit(numOfBlocks, POINTER_BYTES);
req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
if (req.data == NULL || req.dataLen == NULL) {
taosArrayDestroyP(req.data, taosMemoryFree);
......@@ -413,8 +415,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
req.taskId = downstreamTaskId;
qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to down stream s-task:0x%x in vgId:%d", pTask->id.idStr,
pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId);
code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
taosArrayDestroyP(req.data, taosMemoryFree);
......@@ -436,8 +438,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
pReqs[i].streamId = pTask->id.streamId;
pReqs[i].dataSrcVgId = pData->srcVgId;
pReqs[i].upstreamTaskId = pTask->id.taskId;
pReqs[i].upstreamChildId = pTask->selfChildId;
pReqs[i].upstreamNodeId = pTask->nodeId;
pReqs[i].upstreamChildId = pTask->info.selfChildId;
pReqs[i].upstreamNodeId = pTask->info.nodeId;
pReqs[i].blockNum = 0;
pReqs[i].data = taosArrayInit(0, sizeof(void*));
pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t));
......@@ -471,13 +473,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
}
}
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId,
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId,
numOfBlocks, vgSz);
for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId,
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId,
pReqs[i].blockNum, pVgInfo->vgId);
if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
......@@ -506,26 +508,26 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
numOfElems);
}
// to make sure only one dispatch is running
int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) {
qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
return 0;
}
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus);
SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
if (pDispatchedBlock == NULL) {
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr,
pTask->outputStatus);
qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pTask->outputStatus);
return 0;
}
ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK);
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
int32_t code = streamDispatchAllBlocks(pTask, pDispatchedBlock);
int32_t code = streamDispatchAllBlocks(pTask, pBlock);
if (code != TSDB_CODE_SUCCESS) {
streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
......@@ -533,6 +535,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
// this block can be freed only when it has been pushed to down stream.
destroyStreamDataBlock(pDispatchedBlock);
destroyStreamDataBlock(pBlock);
return code;
}
......@@ -108,10 +108,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->selfChildId;
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
numOfBlocks += 1;
qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
pRetrieveBlock->reqId);
}
......@@ -127,7 +127,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
block.info.childId = pTask->info.selfChildId;
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1;
......@@ -135,7 +135,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
taosArrayPush(pRes, &block);
qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
pTask->selfChildId, numOfBlocks, size / 1048576.0);
pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
// current output should be dispatched to down stream nodes
if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
......@@ -164,7 +164,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t code = 0;
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.pExecutor;
qSetStreamOpOpen(exec);
......@@ -200,7 +200,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
batchCnt++;
......@@ -275,7 +275,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
return -1;
}
if (pTask->taskLevel == TASK_LEVEL__SINK) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
}
......@@ -344,7 +344,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
times++;
taosMsleep(10);
qDebug("===stream===try again batchSize:%d", batchSize);
......@@ -358,7 +358,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pInput == NULL) {
pInput = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->taskLevel == TASK_LEVEL__SINK) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
......@@ -392,7 +392,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break;
}
if (pTask->taskLevel == TASK_LEVEL__SINK) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
......@@ -400,7 +400,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
// wait for the task to be ready to go
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
while (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id,
......@@ -423,7 +423,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
......
......@@ -388,7 +388,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
// todo handle the fill history task
ASSERT(0);
if (pTask->fillHistory) {
if (pTask->info.fillHistory) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM);
streamTaskCheckDownstreamTasks(pTask);
}
......
......@@ -27,9 +27,9 @@ const char* streamGetTaskStatusStr(int32_t status) {
}
}
int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->nodeId);
qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->info.nodeId);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
SVersionRange* pRange = &pTask->dataRange.range;
......@@ -56,11 +56,11 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
/*ASSERT(0);*/
}
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetStatusNormal(pTask);
streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
streamSetStatusNormal(pTask);
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
}
......@@ -77,8 +77,8 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->nodeId,
.childId = pTask->selfChildId,
.upstreamNodeId = pTask->info.nodeId,
.childId = pTask->info.selfChildId,
};
// serialize
......@@ -89,7 +89,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->checkReqId = req.reqId;
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId,
req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
......@@ -105,12 +105,12 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->nodeId,
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->info.nodeId,
req.downstreamTaskId, req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else {
qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId);
qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskLaunchRecover(pTask);
}
......@@ -128,7 +128,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
.childId = pRsp->childId,
};
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->nodeId,
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
req.downstreamTaskId, req.downstreamNodeId);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
......@@ -229,7 +229,7 @@ int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerR
}
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
pReq->msgHead.vgId = pTask->nodeId;
pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId;
return 0;
......@@ -240,7 +240,7 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
}
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
pReq->msgHead.vgId = pTask->nodeId;
pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId;
return 0;
......@@ -264,7 +264,7 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
}
int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) {
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId };
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
......@@ -287,9 +287,9 @@ int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) {
// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo);
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete fill history procedure", pTask->id.idStr,
pTask->recoverWaitingUpstream);
pTask->numOfWaitingUpstream);
return 0;
}
......@@ -306,8 +306,8 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
}
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
if (pTask->taskLevel == TASK_LEVEL__AGG) {
int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1);
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left);
ASSERT(left >= 0);
if (left == 0) {
......
......@@ -19,7 +19,7 @@
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->selfChildId = childId;
pTask->info.selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
......@@ -33,8 +33,8 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId;
pTask->taskLevel = taskLevel;
pTask->fillHistory = fillHistory;
pTask->info.taskLevel = taskLevel;
pTask->info.fillHistory = fillHistory;
pTask->triggerParam = triggerParam;
char buf[128] = {0};
......@@ -71,21 +71,21 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1;
......@@ -94,14 +94,14 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
int32_t epSz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->childEpInfo, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
if (pTask->taskLevel != TASK_LEVEL__SINK) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
......@@ -131,21 +131,21 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1;
......@@ -156,7 +156,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES);
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1;
......@@ -164,10 +165,10 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
taosMemoryFreeClear(pInfo);
return -1;
}
taosArrayPush(pTask->childEpInfo, &pInfo);
taosArrayPush(pTask->pUpstreamEpInfoList, &pInfo);
}
if (pTask->taskLevel != TASK_LEVEL__SINK) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
......@@ -217,7 +218,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
walCloseReader(pTask->exec.pWalReader);
}
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册