未验证 提交 b3fa64a0 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15782 from taosdata/feature/stream

refactor(stream): unify sink and dispatch
......@@ -66,6 +66,25 @@ enum {
TASK_OUTPUT_STATUS__BLOCKED,
};
enum {
TASK_TRIGGER_STATUS__INACTIVE = 1,
TASK_TRIGGER_STATUS__ACTIVE,
};
enum {
TASK_LEVEL__SOURCE = 1,
TASK_LEVEL__AGG,
TASK_LEVEL__SINK,
};
enum {
TASK_OUTPUT__FIXED_DISPATCH = 1,
TASK_OUTPUT__SHUFFLE_DISPATCH,
TASK_OUTPUT__TABLE,
TASK_OUTPUT__SMA,
TASK_OUTPUT__FETCH,
};
typedef struct {
int8_t type;
} SStreamQueueItem;
......@@ -202,29 +221,6 @@ typedef struct {
int8_t reserved;
} STaskSinkFetch;
enum {
TASK_EXEC__NONE = 1,
TASK_EXEC__PIPE,
};
enum {
TASK_DISPATCH__NONE = 1,
TASK_DISPATCH__FIXED,
TASK_DISPATCH__SHUFFLE,
};
enum {
TASK_SINK__NONE = 1,
TASK_SINK__TABLE,
TASK_SINK__SMA,
TASK_SINK__FETCH,
};
enum {
TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
TASK_TRIGGER_STATUS__ACTIVE,
};
typedef struct {
int32_t nodeId;
int32_t childId;
......@@ -237,11 +233,8 @@ typedef struct {
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t isDataScan;
int8_t execType;
int8_t sinkType;
int8_t dispatchType;
int8_t isStreamDistributed;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
int8_t taskStatus;
......@@ -252,13 +245,12 @@ typedef struct SStreamTask {
int32_t nodeId;
SEpSet epSet;
// used for semi or single task,
// while final task should have processedVer for each child
// used for task source and sink,
// while task agg should have processedVer for each child
int64_t recoverSnapVer;
int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
// int32_t numOfVgroups;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......@@ -266,19 +258,13 @@ typedef struct SStreamTask {
// exec
STaskExec exec;
// TODO: unify sink and dispatch
// local sink
union {
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
// remote dispatcher
// output
union {
STaskDispatcherFixedEp fixedEpDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
int8_t inputStatus;
......@@ -292,9 +278,6 @@ typedef struct SStreamTask {
int64_t triggerParam;
void* timer;
// application storage
// void* ahandle;
// msg handle
SMsgCb* pMsgCb;
} SStreamTask;
......@@ -331,7 +314,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
#if 0
......@@ -346,18 +329,15 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
}
static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
taosWriteQitem(pTask->outputQueue->queue, pBlock);
}
return 0;
......
......@@ -98,13 +98,11 @@ END:
}
int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
pTask->dispatchType = TASK_DISPATCH__NONE;
// sink
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
......@@ -113,8 +111,6 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask
}
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
pTask->sinkType = TASK_SINK__NONE;
bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) {
......@@ -122,7 +118,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
ASSERT(pDb);
if (pDb->cfg.numOfVgroups > 1) {
isShuffle = true;
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
ASSERT(0);
......@@ -152,7 +148,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
}
}
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
......@@ -178,7 +174,6 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, co
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE);
return 0;
}
......@@ -249,26 +244,20 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// source
pTask->isDataScan = 0;
// exec
pTask->execType = TASK_EXEC__NONE;
// type
pTask->taskLevel = TASK_LEVEL__SINK;
// sink
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
}
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
}
return 0;
}
......@@ -295,25 +284,19 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
#endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
// source
pTask->isDataScan = 0;
// exec
pTask->execType = TASK_EXEC__NONE;
pTask->taskLevel = TASK_LEVEL__SINK;
// sink
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
return 0;
}
......@@ -338,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
if (totLevel == 2 || externalTargetDB || multiTarget) {
/*if (true) {*/
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
......@@ -376,8 +360,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
// source
pInnerTask->isDataScan = 0;
pInnerTask->taskLevel = TASK_LEVEL__AGG;
// trigger
pInnerTask->triggerParam = pStream->triggerParam;
......@@ -388,9 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
// exec
pInnerTask->execType = TASK_EXEC__PIPE;
#if 0
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDbObj != NULL);
......@@ -452,19 +432,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
mndAddTaskToTaskSet(taskSourceLevel, pTask);
// source
pTask->isDataScan = 1;
pTask->taskLevel = TASK_LEVEL__SOURCE;
// add fixed vg dispatch
pTask->sinkType = TASK_SINK__NONE;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->fixedEpDispatcher.taskId = pInnerTask->taskId;
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
// exec
pTask->execType = TASK_EXEC__PIPE;
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
......@@ -515,7 +492,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
mndAddTaskToTaskSet(taskOneLevel, pTask);
// source
pTask->isDataScan = 1;
pTask->taskLevel = TASK_LEVEL__SOURCE;
// trigger
pTask->triggerParam = pStream->triggerParam;
......@@ -527,8 +504,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
mndAddSinkToTask(pMnode, pStream, pTask);
}
// exec
pTask->execType = TASK_EXEC__PIPE;
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
......
......@@ -323,8 +323,7 @@ FAIL:
}
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
SEncoder encoder;
......@@ -548,7 +547,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
SStreamTask *pTask = taosArrayGetP(pTasks, 0);
if (!pTask->isDataScan && pTask->execType != TASK_EXEC__NONE) {
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(sz == 1);
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
return -1;
......@@ -564,8 +563,8 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (!pTask->isDataScan) break;
ASSERT(pTask->execType != TASK_EXEC__NONE);
if (pTask->taskLevel != TASK_LEVEL__SOURCE) break;
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
return -1;
}
......
......@@ -110,9 +110,6 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
pTask->pMsgCb = &pNode->msgCb;
ASSERT(pTask->execType != TASK_EXEC__NONE);
ASSERT(pTask->isDataScan == 0);
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
ASSERT(pTask->exec.executor);
......
......@@ -604,8 +604,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
int32_t code = 0;
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
......@@ -624,32 +624,30 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->pMsgCb = &pTq->pVnode->msgCb;
// exec
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
if (pTask->isDataScan) {
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTqReader = 1,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
} else {
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
}
// expand executor
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTqReader = 1,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
ASSERT(pTask->exec.executor);
}
// sink
/*pTask->ahandle = pTq->pVnode;*/
if (pTask->sinkType == TASK_SINK__SMA) {
if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.vnode = pTq->pVnode;
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->sinkType == TASK_SINK__TABLE) {
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqTableSink;
......@@ -715,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (!pTask->isDataScan) continue;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
......
......@@ -416,7 +416,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->isDataScan) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
ASSERT(code == 0);
}
......
......@@ -65,7 +65,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
}
trigger->pBlock->info.type = STREAM_GET_ALL;
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
streamTaskInput(pTask, (SStreamQueueItem*)trigger);
streamSchedExec(pTask);
......@@ -77,7 +77,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) {
pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
}
return 0;
}
......@@ -186,7 +186,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
if (exec) {
streamTryExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
} else {
......@@ -201,7 +201,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
qDebug("task %d receive dispatch rsp", pTask->taskId);
if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp);
if (leftRsp > 0) return 0;
......@@ -222,7 +222,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
int32_t streamProcessRunReq(SStreamTask* pTask) {
streamTryExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
return 0;
......@@ -250,7 +250,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
streamProcessRunReq(pTask);
if (pTask->isDataScan) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
// scan data to recover
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
pTask->taskStatus = TASK_STATUS__RECOVERING;
......@@ -272,12 +272,11 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE);
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
streamSchedExec(pTask);
/*streamTryExec(pTask);*/
/*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/
/*streamDispatch(pTask);*/
return 0;
......
......@@ -242,7 +242,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
int32_t blockNum = taosArrayGetSize(pData->blocks);
ASSERT(blockNum != 0);
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.dataSrcVgId = pData->srcVgId,
......@@ -282,7 +282,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
taosArrayDestroy(req.dataLen);
return code;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0);
......@@ -393,11 +393,11 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data,
int32_t vgId = 0;
int32_t downstreamTaskId = 0;
// find ep
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
vgId = pTask->fixedEpDispatcher.nodeId;
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
downstreamTaskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
// TODO get ctbName for each block
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
......@@ -439,8 +439,7 @@ FAIL:
}
int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
ASSERT(pTask->sinkType == TASK_SINK__NONE);
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
......
......@@ -24,7 +24,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SStreamTrigger* pTrigger = (SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
......@@ -92,7 +92,7 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock
}
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
ASSERT(pTask->execType != TASK_EXEC__NONE);
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
void* exec = pTask->exec.executor;
......@@ -139,8 +139,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
return -1;
}
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
}
......@@ -161,7 +160,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (data == NULL) {
data = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->execType == TASK_EXEC__NONE) {
if (pTask->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
......@@ -187,7 +186,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break;
}
if (pTask->execType == TASK_EXEC__NONE) {
if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, data);
continue;
......
......@@ -52,15 +52,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
return pMeta;
_err:
return NULL;
}
void streamMetaClose(SStreamMeta* pMeta) {
//
return;
tdbCommit(pMeta->db, &pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pStateDb);
tdbClose(pMeta->db);
}
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
......@@ -123,13 +124,32 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) {
if (tdbCommit(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
memset(&pMeta->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamMetaRollBack(SStreamMeta* pMeta) {
// TODO tdb rollback
int32_t streamMetaAbort(SStreamMeta* pMeta) {
if (tdbAbort(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
memset(&pMeta->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamRestoreTask(SStreamMeta* pMeta) {
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
......@@ -153,6 +173,18 @@ int32_t streamRestoreTask(SStreamMeta* pMeta) {
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
return -1;
}
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
return -1;
}
}
if (tdbTbcClose(pCur) < 0) {
return -1;
}
return 0;
......
......@@ -88,14 +88,15 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
}
int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
#if 0
if (pTask->taskStatus != TASK_STATUS__FAIL) {
return 0;
}
if (pTask->isStreamDistributed) {
if (pTask->isDataScan) {
if (pTask->taskType == TASK_TYPE__SOURCE) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
} else if (pTask->execType != TASK_EXEC__NONE) {
} else if (pTask->taskType != TASK_TYPE__SINK) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
bool hasCheckpoint = false;
int32_t childSz = taosArrayGetSize(pTask->childEpInfo);
......@@ -113,7 +114,7 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
}
}
} else {
if (pTask->isDataScan) {
if (pTask->taskType == TASK_TYPE__SOURCE) {
if (pTask->checkpointVer != -1) {
// load from checkpoint
} else {
......@@ -133,5 +134,6 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
}
}
#endif
return 0;
}
......@@ -52,10 +52,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
......@@ -73,27 +71,23 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
if (pTask->execType != TASK_EXEC__NONE) {
if (pTask->taskLevel != TASK_LEVEL__SINK) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType == TASK_SINK__TABLE) {
if (pTask->outputType == TASK_OUTPUT__TABLE) {
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
} else if (pTask->outputType == TASK_OUTPUT__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
......@@ -107,10 +101,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
......@@ -131,29 +123,25 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
taosArrayPush(pTask->childEpInfo, &pInfo);
}
if (pTask->execType != TASK_EXEC__NONE) {
if (pTask->taskLevel != TASK_LEVEL__SINK) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType == TASK_SINK__TABLE) {
if (pTask->outputType == TASK_OUTPUT__TABLE) {
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
} else if (pTask->outputType == TASK_OUTPUT__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册