提交 ce842ec8 编写于 作者: K kailixu

Merge branch '3.0' into enh/TD-24987-3.0

...@@ -45,8 +45,8 @@ enum { ...@@ -45,8 +45,8 @@ enum {
TASK_STATUS__FAIL, TASK_STATUS__FAIL,
TASK_STATUS__STOP, TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused, todo remove it?
TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, // pause
}; };
enum { enum {
...@@ -272,6 +272,7 @@ typedef struct SStreamStatus { ...@@ -272,6 +272,7 @@ typedef struct SStreamStatus {
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool transferState; bool transferState;
int8_t timerActive; // timer is active int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus; } SStreamStatus;
typedef struct SHistDataRange { typedef struct SHistDataRange {
...@@ -296,15 +297,15 @@ typedef struct SDispatchMsgInfo { ...@@ -296,15 +297,15 @@ typedef struct SDispatchMsgInfo {
} SDispatchMsgInfo; } SDispatchMsgInfo;
typedef struct { typedef struct {
int8_t outputType; int8_t type;
int8_t outputStatus; int8_t status;
SStreamQueue* outputQueue; SStreamQueue* queue;
} SSTaskOutputInfo; } STaskOutputInfo;
struct SStreamTask { struct SStreamTask {
SStreamId id; SStreamId id;
SSTaskBasicInfo info; SSTaskBasicInfo info;
int8_t outputType; STaskOutputInfo outputInfo;
SDispatchMsgInfo msgInfo; SDispatchMsgInfo msgInfo;
SStreamStatus status; SStreamStatus status;
SCheckpointInfo chkInfo; SCheckpointInfo chkInfo;
...@@ -315,7 +316,7 @@ struct SStreamTask { ...@@ -315,7 +316,7 @@ struct SStreamTask {
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId; int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo> SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
int64_t initTs;
// output // output
union { union {
STaskDispatcherFixedEp fixedEpDispatcher; STaskDispatcherFixedEp fixedEpDispatcher;
...@@ -326,9 +327,7 @@ struct SStreamTask { ...@@ -326,9 +327,7 @@ struct SStreamTask {
}; };
int8_t inputStatus; int8_t inputStatus;
int8_t outputStatus;
SStreamQueue* inputQueue; SStreamQueue* inputQueue;
SStreamQueue* outputQueue;
// trigger // trigger
int8_t triggerStatus; int8_t triggerStatus;
...@@ -337,6 +336,8 @@ struct SStreamTask { ...@@ -337,6 +336,8 @@ struct SStreamTask {
void* launchTaskTimer; void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend SStreamState* pState; // state backend
SArray* pRspMsgList;
TdThreadMutex lock;
// the followings attributes don't be serialized // the followings attributes don't be serialized
int32_t notReadyTasks; int32_t notReadyTasks;
...@@ -458,7 +459,9 @@ typedef struct { ...@@ -458,7 +459,9 @@ typedef struct {
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t upstreamTaskId;
int32_t downstreamTaskId;
int32_t upstreamNodeId;
int32_t childId; int32_t childId;
} SStreamScanHistoryFinishReq, SStreamTransferReq; } SStreamScanHistoryFinishReq, SStreamTransferReq;
...@@ -519,6 +522,17 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR ...@@ -519,6 +522,17 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR
int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
typedef struct {
int64_t streamId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t downstreamId;
int32_t downstreamNode;
} SStreamCompleteHistoryMsg;
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t downstreamTaskId; int32_t downstreamTaskId;
...@@ -559,7 +573,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -559,7 +573,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
void streamTaskInputFail(SStreamTask* pTask); void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask);
...@@ -569,17 +582,20 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus); ...@@ -569,17 +582,20 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskIsIdle(const SStreamTask* pTask);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history // recover and fill history
void streamPrepareNdoCheckDownstream(SStreamTask* pTask); void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
...@@ -593,6 +609,9 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask); ...@@ -593,6 +609,9 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status); const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask);
// source level // source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
...@@ -604,8 +623,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); ...@@ -604,8 +623,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level // agg level
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask); int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId); int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta // stream task meta
void streamMetaInit(); void streamMetaInit();
......
...@@ -79,6 +79,7 @@ SArray *smGetMsgHandles() { ...@@ -79,6 +79,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0; code = 0;
_OVER: _OVER:
......
...@@ -740,6 +740,7 @@ SArray *vmGetMsgHandles() { ...@@ -740,6 +740,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#define SINK_NODE_LEVEL (0) #define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode; extern bool tsDeployOnSnode;
static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory); SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
...@@ -87,10 +88,10 @@ END: ...@@ -87,10 +88,10 @@ END:
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
if (pStream->smaId != 0) { if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA; pTask->outputInfo.type = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
pTask->outputType = TASK_OUTPUT__TABLE; pTask->outputInfo.type = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
...@@ -110,7 +111,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr ...@@ -110,7 +111,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
if (pDb != NULL && pDb->cfg.numOfVgroups > 1) { if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
isShuffle = true; isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
return -1; return -1;
...@@ -267,10 +268,15 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas ...@@ -267,10 +268,15 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
return terrno; return terrno;
} }
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamEpInfo(pTask, pSinkTask);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) { if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -291,11 +297,11 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { ...@@ -291,11 +297,11 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
pDispatcher->nodeId = pTask->info.nodeId; pDispatcher->nodeId = pTask->info.nodeId;
pDispatcher->epSet = pTask->info.epSet; pDispatcher->epSet = pTask->info.epSet;
pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; pDstTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
} }
int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
if (pEpInfo == NULL) { if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -418,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui ...@@ -418,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
return -1; return -1;
} }
return setEpToDownstreamTask(pTask, pDownstreamTask); return setTaskUpstreamEpInfo(pTask, pDownstreamTask);
} }
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
...@@ -586,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr ...@@ -586,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) {
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask);
}
}
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
...@@ -637,6 +651,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* ...@@ -637,6 +651,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
return code; return code;
} }
setSinkTaskUpstreamInfo(pStream->tasks, pAggTask);
setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
// source level // source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);
} else if (numOfPlanLevel == 1) { } else if (numOfPlanLevel == 1) {
......
...@@ -66,14 +66,15 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -66,14 +66,15 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(512 << 10); pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->outputQueue = streamQueueOpen(512 << 10); pTask->outputInfo.queue = streamQueueOpen(512 << 10);
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
return -1; return -1;
} }
pTask->initTs = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb; pTask->pMsgCb = &pSnode->msgCb;
pTask->chkInfo.version = ver; pTask->chkInfo.version = ver;
pTask->pMeta = pSnode->pMeta; pTask->pMeta = pSnode->pMeta;
...@@ -90,6 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -90,6 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor); ASSERT(pTask->exec.pExecutor);
taosThreadMutexInit(&pTask->lock, NULL);
streamSetupScheduleTrigger(pTask); streamSetupScheduleTrigger(pTask);
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
...@@ -166,11 +168,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ...@@ -166,11 +168,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
taosWUnLockLatch(&pSnode->pMeta->lock); taosWUnLockLatch(&pSnode->pMeta->lock);
streamPrepareNdoCheckDownstream(pTask);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
streamTaskCheckDownstreamTasks(pTask);
return 0; return 0;
} }
...@@ -274,7 +275,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { ...@@ -274,7 +275,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return 0; return 0;
} }
int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
...@@ -287,12 +288,12 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -287,12 +288,12 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
// do process request // do process request
if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) { if (streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1; return -1;
} }
...@@ -415,7 +416,7 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -415,7 +416,7 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
case TDMT_STREAM_RETRIEVE_RSP: case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRetrieveRsp(pSnode, pMsg); return sndProcessTaskRetrieveRsp(pSnode, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH: case TDMT_STREAM_SCAN_HISTORY_FINISH:
return sndProcessTaskRecoverFinishReq(pSnode, pMsg); return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK: case TDMT_STREAM_TASK_CHECK:
......
...@@ -250,8 +250,8 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); ...@@ -250,8 +250,8 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version); int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
// sma // sma
......
此差异已折叠。
...@@ -80,11 +80,17 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { ...@@ -80,11 +80,17 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
continue; continue;
} }
streamTaskCheckDownstreamTasks(pTask); if (pTask->info.fillHistory == 1) {
tqDebug("s-task:%s fill-history task, wait for related stream task:0x%x to launch it", pTask->id.idStr,
pTask->streamTaskId.taskId);
continue;
}
streamTaskDoCheckDownstreamTasks(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
taosArrayDestroy(pTaskList);
taosArrayDestroy(pTaskList);
return 0; return 0;
} }
......
...@@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) ...@@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
case TDMT_STREAM_TRANSFER_STATE: case TDMT_STREAM_TRANSFER_STATE:
return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg); return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH: case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg); return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in stream queue", pMsg->msgType); vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
......
...@@ -122,7 +122,7 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { ...@@ -122,7 +122,7 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
return; return;
} }
qDebug("%s set fill history start key:%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN); qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN);
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
} }
......
...@@ -31,6 +31,12 @@ typedef struct { ...@@ -31,6 +31,12 @@ typedef struct {
void* timer; void* timer;
} SStreamGlobalEnv; } SStreamGlobalEnv;
typedef struct {
SEpSet epset;
int32_t taskId;
SRpcMsg msg;
} SStreamContinueExecInfo;
extern SStreamGlobalEnv streamEnv; extern SStreamGlobalEnv streamEnv;
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
...@@ -54,6 +60,9 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc ...@@ -54,6 +60,9 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
......
...@@ -216,15 +216,16 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, ...@@ -216,15 +216,16 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// todo add log // todo add log
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t code = 0; int32_t code = 0;
if (pTask->outputType == TASK_OUTPUT__TABLE) { int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
destroyStreamDataBlock(pBlock); destroyStreamDataBlock(pBlock);
} else if (pTask->outputType == TASK_OUTPUT__SMA) { } else if (type == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
destroyStreamDataBlock(pBlock); destroyStreamDataBlock(pBlock);
} else { } else {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = taosWriteQitem(pTask->outputQueue->queue, pBlock); code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
if (code != 0) { // todo failed to add it into the output queue, free it. if (code != 0) { // todo failed to add it into the output queue, free it.
return code; return code;
} }
...@@ -274,7 +275,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -274,7 +275,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
// there are other dispatch message not response yet // there are other dispatch message not response yet
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
if (leftRsp > 0) { if (leftRsp > 0) {
...@@ -283,9 +284,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -283,9 +284,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} }
pTask->msgInfo.retryCount = 0; pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus); qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status);
// the input queue of the (down stream) task that receive the output data is full, // the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp // so the TASK_INPUT_STATUS_BLOCKED is rsp
...@@ -309,7 +310,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -309,7 +310,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} }
// now ready for next data output // now ready for next data output
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline // otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask); streamDispatchStreamBlock(pTask);
...@@ -418,4 +419,16 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { ...@@ -418,4 +419,16 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
} }
} }
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
\ No newline at end of file
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
if (pInfo->taskId == taskId) {
return pInfo;
}
}
return NULL;
}
\ No newline at end of file
...@@ -25,6 +25,12 @@ typedef struct SBlockName { ...@@ -25,6 +25,12 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN]; char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName; } SBlockName;
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType;
pMsg->pCont = pCont;
pMsg->contLen = contLen;
}
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
...@@ -311,13 +317,12 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc ...@@ -311,13 +317,12 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc
msg.contLen = tlen + sizeof(SMsgHead); msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf; msg.pCont = buf;
msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH; msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
pReq->taskId, vgId); pReq->downstreamTaskId, vgId);
return 0; return 0;
} }
...@@ -437,7 +442,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -437,7 +442,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
int32_t numOfBlocks = taosArrayGetSize(pData->blocks); int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0); ASSERT(numOfBlocks != 0);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq req = {0}; SStreamDispatchReq req = {0};
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
...@@ -467,7 +472,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -467,7 +472,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen); taosArrayDestroy(req.dataLen);
return code; return code;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0); ASSERT(rspCnt == 0);
...@@ -545,7 +550,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -545,7 +550,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
static void doRetryDispatchData(void* param, void* tmrId) { static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -561,29 +566,29 @@ void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { ...@@ -561,29 +566,29 @@ void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
} }
int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH)); STaskOutputInfo* pInfo = &pTask->outputInfo;
ASSERT((pInfo->type == TASK_OUTPUT__FIXED_DISPATCH || pInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH));
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); int32_t numOfElems = taosQueueItemSize(pInfo->queue->queue);
if (numOfElems > 0) { if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
numOfElems); numOfElems);
} }
// to make sure only one dispatch is running // to make sure only one dispatch is running
int8_t old = int8_t old = atomic_val_compare_exchange_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) { if (old != TASK_OUTPUT_STATUS__NORMAL) {
qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
return 0; return 0;
} }
ASSERT(pTask->msgInfo.pData == NULL); ASSERT(pTask->msgInfo.pData == NULL);
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus); qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pInfo->status);
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pBlock = streamQueueNextItem(pInfo->queue);
if (pBlock == NULL) { if (pBlock == NULL) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL);
qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pTask->outputStatus); qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pInfo->status);
return 0; return 0;
} }
...@@ -599,19 +604,19 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ...@@ -599,19 +604,19 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} }
qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr, qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr,
tstrerror(terrno), pTask->outputStatus, retryCount); tstrerror(terrno), pInfo->status, retryCount);
// todo deal with only partially success dispatch case // todo deal with only partially success dispatch case
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
destroyStreamDataBlock(pTask->msgInfo.pData); destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL; pTask->msgInfo.pData = NULL;
return code; return code;
} }
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", pTask->id.idStr, qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
break; break;
} }
...@@ -620,3 +625,93 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ...@@ -620,3 +625,93 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
// this block can not be deleted until it has been sent to downstream task successfully. // this block can not be deleted until it has been sent to downstream task successfully.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
SStreamCompleteHistoryMsg msg = {
.streamId = pReq->streamId,
.upstreamTaskId = pReq->upstreamTaskId,
.upstreamNodeId = pReq->upstreamNodeId,
.downstreamId = pReq->downstreamTaskId,
.downstreamNode = pTask->pMeta->vgId,
};
tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code);
if (code < 0) {
return code;
}
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeCompleteHistoryDataMsg(&encoder, &msg);
tEncoderClear(&encoder);
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};
initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
info.msg.info = *pRpcInfo;
taosThreadMutexLock(&pTask->lock);
if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
}
taosArrayPush(pTask->pRspMsgList, &info);
taosThreadMutexUnlock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
num);
return TSDB_CODE_SUCCESS;
}
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
for (int32_t i = 0; i < num; ++i) {
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
tmsgSendRsp(&pInfo->msg);
qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel,
pInfo->taskId);
}
taosArrayClear(pTask->pRspMsgList);
qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
num);
return 0;
}
...@@ -351,30 +351,40 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { ...@@ -351,30 +351,40 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
} }
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->status.transferState = false; // reset this value, to avoid transfer state again
pTask->id.idStr, pTask->streamTaskId.taskId);
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else { } else {
qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
pStreamTask->id.idStr); pStreamTask->id.idStr);
} }
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); // todo fix race condition
streamTaskDisablePause(pTask);
streamTaskDisablePause(pStreamTask);
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
// for the step 2. For a agg task // for the step 2. For a agg task
int8_t status = pStreamTask->status.taskStatus;
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); ASSERT(status == TASK_STATUS__HALT);
} else { } else {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); ASSERT(status == TASK_STATUS__SCAN_HISTORY);
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
qDebug("s-task:%s status: halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); qDebug("s-task:%s halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
} }
// wait for the stream task to be idle // wait for the stream task to handle all in the inputQ, and to be idle
waitForTaskIdle(pTask, pStreamTask); waitForTaskIdle(pTask, pStreamTask);
// In case of sink tasks, no need to be halted for them. // In case of sink tasks, no need to be halted for them.
...@@ -399,10 +409,27 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -399,10 +409,27 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// reset the status of stream task
streamSetStatusNormal(pStreamTask); streamSetStatusNormal(pStreamTask);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
// save to disk
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);
streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
// pause allowed
streamTaskEnablePause(pStreamTask);
streamTaskEnablePause(pTask);
streamSchedExec(pStreamTask); streamSchedExec(pStreamTask);
streamMetaReleaseTask(pTask->pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -480,7 +507,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -480,7 +507,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(batchSize == 0); ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) { if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask); int32_t code = streamTransferStateToStreamTask(pTask);
pTask->status.transferState = false; // reset this value, to avoid transfer state again
if (code != TSDB_CODE_SUCCESS) { // todo handle this if (code != TSDB_CODE_SUCCESS) { // todo handle this
return 0; return 0;
} }
...@@ -550,22 +576,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -550,22 +576,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
bool streamTaskIsIdle(const SStreamTask* pTask) { bool streamTaskIsIdle(const SStreamTask* pTask) {
int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue); return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE/* && pTask->status.taskStatus != TASK_STATUS__HALT*/);
if (numOfItems > 0) {
return false;
}
numOfItems = taosQallItemSize(pTask->inputQueue->qall);
if (numOfItems > 0) {
return false;
}
// blocked by downstream task
if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) {
return false;
}
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
} }
int32_t streamTryExec(SStreamTask* pTask) { int32_t streamTryExec(SStreamTask* pTask) {
......
...@@ -266,7 +266,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -266,7 +266,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
if (!streamTaskShouldStop(&(*ppTask)->status)) { if (!streamTaskShouldStop(&(*ppTask)->status)) {
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); qTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
return *ppTask; return *ppTask;
} }
} }
...@@ -278,7 +278,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -278,7 +278,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
if (ref > 0) { if (ref > 0) {
qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref); qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
} else if (ref == 0) { } else if (ref == 0) {
ASSERT(streamTaskShouldStop(&pTask->status)); ASSERT(streamTaskShouldStop(&pTask->status));
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/transport/trpc.h>
#include <streamInt.h>
#include "executor.h" #include "executor.h"
#include "tstream.h" #include "tstream.h"
#include "wal.h" #include "wal.h"
...@@ -44,7 +46,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto ...@@ -44,7 +46,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
addToTaskset(pTaskList, pTask); addToTaskset(pTaskList, pTask);
return pTask; return pTask;
...@@ -74,7 +76,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -74,7 +76,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
...@@ -109,19 +111,19 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -109,19 +111,19 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
} }
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->outputType == TASK_OUTPUT__SMA) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->outputType == TASK_OUTPUT__FETCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
} }
...@@ -137,7 +139,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -137,7 +139,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
...@@ -179,21 +181,21 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -179,21 +181,21 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
} }
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1; if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->outputType == TASK_OUTPUT__SMA) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->outputType == TASK_OUTPUT__FETCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
} }
...@@ -203,6 +205,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -203,6 +205,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
return 0; return 0;
} }
static void freeItem(void* p) {
SStreamContinueExecInfo* pInfo = p;
rpcFreeCont(pInfo->msg.pCont);
}
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s", pTask->id.idStr); qDebug("free s-task:%s", pTask->id.idStr);
...@@ -211,8 +218,8 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -211,8 +218,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
streamQueueClose(pTask->inputQueue); streamQueueClose(pTask->inputQueue);
} }
if (pTask->outputQueue) { if (pTask->outputInfo.queue) {
streamQueueClose(pTask->outputQueue); streamQueueClose(pTask->outputInfo.queue);
} }
if (pTask->exec.qmsg) { if (pTask->exec.qmsg) {
...@@ -229,11 +236,11 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -229,11 +236,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
} }
taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree); taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema); taosMemoryFree(pTask->tbSink.pTSchema);
tSimpleHashCleanup(pTask->tbSink.pTblInfo); tSimpleHashCleanup(pTask->tbSink.pTblInfo);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
taosArrayDestroy(pTask->checkReqIds); taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL; pTask->checkReqIds = NULL;
...@@ -251,5 +258,11 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -251,5 +258,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
tSimpleHashCleanup(pTask->pNameMap); tSimpleHashCleanup(pTask->pNameMap);
} }
if (pTask->pRspMsgList != NULL) {
taosArrayDestroyEx(pTask->pRspMsgList, freeItem);
pTask->pRspMsgList = NULL;
}
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册