未验证 提交 ab01fec5 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14134 from taosdata/feature/stream

feat(stream): stream support multiple type input
...@@ -239,17 +239,14 @@ typedef struct { ...@@ -239,17 +239,14 @@ typedef struct {
struct SStreamTask { struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int8_t inputType; int8_t isDataScan;
int8_t taskStatus;
int8_t execStatus;
int8_t execType; int8_t execType;
int8_t sinkType; int8_t sinkType;
int8_t dispatchType; int8_t dispatchType;
int16_t dispatchMsgType; int16_t dispatchMsgType;
int8_t isDataScan; int8_t taskStatus;
int8_t execStatus;
// node info // node info
int32_t selfChildId; int32_t selfChildId;
......
...@@ -270,10 +270,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb ...@@ -270,10 +270,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
pTask->nodeId = pVgroup->vgId; pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pTask->isDataScan = 0;
// source // source
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; pTask->isDataScan = 0;
// exec // exec
pTask->execType = TASK_EXEC__NONE; pTask->execType = TASK_EXEC__NONE;
...@@ -320,10 +318,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* ...@@ -320,10 +318,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
#endif #endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
pTask->isDataScan = 0;
// source // source
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; pTask->isDataScan = 0;
// exec // exec
pTask->execType = TASK_EXEC__NONE; pTask->execType = TASK_EXEC__NONE;
...@@ -392,12 +388,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -392,12 +388,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pInnerTask = tNewSStreamTask(pStream->uid); pInnerTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->isDataScan = 0;
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
// input // source
pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; pInnerTask->isDataScan = 0;
// trigger // trigger
pInnerTask->triggerParam = pStream->triggerParam; pInnerTask->triggerParam = pStream->triggerParam;
...@@ -458,11 +452,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -458,11 +452,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskSourceLevel, pTask); mndAddTaskToTaskSet(taskSourceLevel, pTask);
// source
pTask->isDataScan = 1; pTask->isDataScan = 1;
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
// add fixed vg dispatch // add fixed vg dispatch
pTask->sinkType = TASK_SINK__NONE; pTask->sinkType = TASK_SINK__NONE;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
...@@ -517,10 +509,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -517,10 +509,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask); mndAddTaskToTaskSet(taskOneLevel, pTask);
pTask->isDataScan = 1;
// input // input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; pTask->isDataScan = 1;
// trigger // trigger
pTask->triggerParam = pStream->triggerParam; pTask->triggerParam = pStream->triggerParam;
......
...@@ -426,6 +426,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -426,6 +426,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(0); ASSERT(0);
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
pTask->execStatus = TASK_EXEC_STATUS__IDLE; pTask->execStatus = TASK_EXEC_STATUS__IDLE;
...@@ -505,7 +506,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { ...@@ -505,7 +506,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
continue; continue;
} }
if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue; if (!pTask->isDataScan) continue;
if (!failed) { if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
......
...@@ -345,8 +345,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -345,8 +345,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter); pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { if (pTask->isDataScan) {
int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd); int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
ASSERT(code == 0); ASSERT(code == 0);
} }
......
...@@ -40,7 +40,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -40,7 +40,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid; pInfo->assignBlockUid = assignUid;
// the block type can not be changed in the streamscan operators // no need to check
#if 0 #if 0
if (pInfo->blockType == 0) { if (pInfo->blockType == 0) {
pInfo->blockType = type; pInfo->blockType = type;
...@@ -49,10 +49,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -49,10 +49,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
#endif #endif
// rollup sma, the same qTaskInfo is used to insert data by SubmitReq and fetch result by SSDataBlock
if (pInfo->blockType != type) {
pInfo->blockType = type; pInfo->blockType = type;
}
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
......
...@@ -916,6 +916,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -916,6 +916,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
if (pInfo->validBlockIndex >= total) { if (pInfo->validBlockIndex >= total) {
/*doClearBufferedBlocks(pInfo);*/ /*doClearBufferedBlocks(pInfo);*/
......
...@@ -24,12 +24,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -24,12 +24,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SStreamTrigger* pTrigger = (SStreamTrigger*)data; SStreamTrigger* pTrigger = (SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false); qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
ASSERT(pTask->inputType == STREAM_INPUT__DATA_SUBMIT);
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK);
SArray* blocks = pBlock->blocks; SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) { } else if (pItem->type == STREAM_INPUT__DROP) {
...@@ -89,17 +88,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -89,17 +88,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
return NULL; return NULL;
} }
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__TRIGGER) { int8_t type = ((SStreamQueueItem*)data)->type;
if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock); blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data); taosFreeQitem(data);
} else { } else if (type == STREAM_INPUT__DATA_BLOCK) {
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data); taosFreeQitem(data);
} } else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
} }
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
return taosArrayInit(0, sizeof(SSDataBlock)); return taosArrayInit(0, sizeof(SSDataBlock));
......
...@@ -50,14 +50,14 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -50,14 +50,14 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/ /*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
...@@ -106,14 +106,14 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -106,14 +106,14 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/ /*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册