未验证 提交 74cd04bd 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11016 from taosdata/feature/tq

Feature/tq
...@@ -138,7 +138,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); ...@@ -138,7 +138,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock); void* blockDataDestroy(SSDataBlock* pBlock);
void blockDebugShowData(SArray* dataBlocks); void blockDebugShowData(const SArray* dataBlocks);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -30,16 +30,18 @@ enum { ...@@ -30,16 +30,18 @@ enum {
STREAM_TASK_STATUS__STOP, STREAM_TASK_STATUS__STOP,
}; };
#if 0
// pipe -> fetch/pipe queue // pipe -> fetch/pipe queue
// merge -> merge queue // merge -> merge queue
// write -> write queue // write -> write queue
enum { enum {
TASK_SINK_MSG__SND_PIPE = 1, TASK_DISPATCH_MSG__SND_PIPE = 1,
TASK_SINK_MSG__SND_MERGE, TASK_DISPATCH_MSG__SND_MERGE,
TASK_SINK_MSG__VND_PIPE, TASK_DISPATCH_MSG__VND_PIPE,
TASK_SINK_MSG__VND_MERGE, TASK_DISPATCH_MSG__VND_MERGE,
TASK_SINK_MSG__VND_WRITE, TASK_DISPATCH_MSG__VND_WRITE,
}; };
#endif
typedef struct { typedef struct {
int32_t nodeId; // 0 for snode int32_t nodeId; // 0 for snode
...@@ -93,13 +95,14 @@ typedef struct { ...@@ -93,13 +95,14 @@ typedef struct {
enum { enum {
TASK_SOURCE__SCAN = 1, TASK_SOURCE__SCAN = 1,
TASK_SOURCE__SINGLE, TASK_SOURCE__PIPE,
TASK_SOURCE__MULTI, TASK_SOURCE__MERGE,
}; };
enum { enum {
TASK_EXEC__NONE = 1, TASK_EXEC__NONE = 1,
TASK_EXEC__EXEC, TASK_EXEC__PIPE,
TASK_EXEC__MERGE,
}; };
enum { enum {
...@@ -129,6 +132,9 @@ typedef struct { ...@@ -129,6 +132,9 @@ typedef struct {
int16_t dispatchMsgType; int16_t dispatchMsgType;
int32_t downstreamTaskId; int32_t downstreamTaskId;
int32_t nodeId;
SEpSet epSet;
// source preprocess // source preprocess
// exec // exec
......
...@@ -1380,7 +1380,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -1380,7 +1380,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf; return buf;
} }
void blockDebugShowData(SArray* dataBlocks) { void blockDebugShowData(const SArray* dataBlocks) {
char pBuf[128]; char pBuf[128];
int32_t sz = taosArrayGetSize(dataBlocks); int32_t sz = taosArrayGetSize(dataBlocks);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
...@@ -1398,13 +1398,17 @@ void blockDebugShowData(SArray* dataBlocks) { ...@@ -1398,13 +1398,17 @@ void blockDebugShowData(SArray* dataBlocks) {
printf(" %25s |", pBuf); printf(" %25s |", pBuf);
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
printf(" %15d |", *(int32_t*)var); printf(" %15d |", *(int32_t*)var);
break; break;
case TSDB_DATA_TYPE_UINT:
printf(" %15u |", *(uint32_t*)var);
break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15ld |", *(int64_t*)var); printf(" %15ld |", *(int64_t*)var);
break; break;
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15lu |", *(uint64_t*)var);
break;
} }
} }
printf("\n"); printf("\n");
......
...@@ -122,7 +122,7 @@ bool tsRetrieveBlockingModel = 0; ...@@ -122,7 +122,7 @@ bool tsRetrieveBlockingModel = 0;
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name // last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
bool tsKeepOriginalColumnName = 0; bool tsKeepOriginalColumnName = 0;
// long query death-lock // kill long query
bool tsDeadLockKillQuery = 0; bool tsDeadLockKillQuery = 0;
// tsdb config // tsdb config
......
...@@ -66,8 +66,11 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet ...@@ -66,8 +66,11 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen; int32_t msgLen;
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
plan->execNode.nodeId = pVgroup->vgId; plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); plan->execNode.epSet = pTask->epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
...@@ -86,8 +89,12 @@ SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) { ...@@ -86,8 +89,12 @@ SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
const SSnodeObj* pSnode) { const SSnodeObj* pSnode) {
int32_t msgLen; int32_t msgLen;
plan->execNode.nodeId = pSnode->id;
plan->execNode.epSet = mndAcquireEpFromSnode(pMnode, pSnode); pTask->nodeId = 0;
pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);
plan->execNode.nodeId = 0;
plan->execNode.epSet = pTask->epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
...@@ -97,9 +104,23 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, ...@@ -97,9 +104,23 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
return 0; return 0;
} }
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
void* pIter = NULL;
SVgObj* pVgroup = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != dbUid) {
sdbRelease(pMnode->pSdb, pVgroup);
continue;
}
return pVgroup;
}
return pVgroup;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
...@@ -108,80 +129,144 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -108,80 +129,144 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(pStream->vgNum == 0); ASSERT(pStream->vgNum == 0);
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); ASSERT(totLevel <= 2);
int32_t lastUsedVgId = 0; pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
// gather vnodes for (int32_t level = 0; level < totLevel; level++) {
// gather snodes SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
// iterate plan, expand source to vnodes and assign ep to each task SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
// iterate tasks, assign sink type and sink ep to each task ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
for (int32_t revLevel = totLevel - 1; revLevel >= 0; revLevel--) {
int32_t level = totLevel - 1 - revLevel;
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, revLevel);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
ASSERT(opNum == 1);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
if (level == 0) {
// if (level == totLevel - 1 /* or no snode */) {
if (level == totLevel - 1) {
// last level, source, must assign to vnode
// must be scan type
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
// replicate task to each vnode
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) { if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
// source part
pTask->sourceType = TASK_SOURCE__SCAN;
lastUsedVgId = pVgroup->vgId; // sink part
pStream->vgNum++; if (level == 0) {
// only for inplace
pTask->sinkType = TASK_SINK__SHOW;
pTask->showSink.reserved = 0;
} else {
pTask->sinkType = TASK_SINK__NONE;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid); // dispatch part
/*pTask->level = level;*/ if (level == 0) {
// TODO pTask->dispatchType = TASK_DISPATCH__NONE;
/*pTask->sourceType = STREAM_SOURCE__SUPER;*/ // if inplace sink, no dispatcher
/*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/ // if fixed ep, add fixed ep dispatcher
// if shuffle, add shuffle dispatcher
} else {
// add fixed ep dispatcher
int32_t lastLevel = level - 1;
ASSERT(lastLevel == 0);
SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
// one merge only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, lastLevel);
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
// exec part
pTask->execType = TASK_EXEC__PIPE;
pTask->exec.parallelizable = 1; pTask->exec.parallelizable = 1;
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask); sdbRelease(pSdb, pVgroup);
taosArrayPush(taskOneLevel, &pTask);
} }
} else { } else {
// merge plan
// TODO if has snode, assign to snode
// else, assign to vnode
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
/*pTask->level = level;*/
/*pTask->sourceType = STREAM_SOURCE__NONE;*/ // source part, currently only support multi source
/*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/ pTask->sourceType = TASK_SOURCE__PIPE;
pTask->exec.parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
// sink part
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); pTask->sinkType = TASK_SINK__SHOW;
if (pSnode == NULL || tsStreamSchedV) { /*pTask->sinkType = TASK_SINK__NONE;*/
ASSERT(lastUsedVgId != 0);
SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId); // dispatch part
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) { pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
sdbRelease(pSdb, pVg); pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
qDestroyQueryPlan(pPlan);
return -1; // exec part
} pTask->execType = TASK_EXEC__MERGE;
sdbRelease(pSdb, pVg); pTask->exec.parallelizable = 0;
} else { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { ASSERT(pVgroup);
sdbRelease(pSdb, pSnode); if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
qDestroyQueryPlan(pPlan); sdbRelease(pSdb, pVgroup);
return -1; qDestroyQueryPlan(pPlan);
} return -1;
}
sdbRelease(pSdb, pVgroup);
taosArrayPush(taskOneLevel, &pTask);
}
taosArrayPush(pStream->tasks, &taskOneLevel);
}
if (totLevel == 2) {
void* pIter = NULL;
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
} }
sdbRelease(pMnode->pSdb, pSnode); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
taosArrayPush(taskOneLevel, pTask); // source part
pTask->sourceType = TASK_SOURCE__MERGE;
// sink part
pTask->sinkType = TASK_SINK__SHOW;
// dispatch part
pTask->dispatchType = TASK_DISPATCH__NONE;
// exec part
pTask->execType = TASK_EXEC__NONE;
pTask->exec.parallelizable = 0;
} }
taosArrayPush(pStream->tasks, taskOneLevel);
} }
// free memory
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return 0; return 0;
} }
......
...@@ -14,12 +14,11 @@ ...@@ -14,12 +14,11 @@
*/ */
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h"
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h" #include "tqMetaStore.h"
#include "tstream.h" #include "tstream.h"
void blockDebugShowData(SArray* dataBlocks);
int32_t tqInit() { return tqPushMgrInit(); } int32_t tqInit() { return tqPushMgrInit(); }
void tqCleanUp() { tqPushMgrCleanUp(); } void tqCleanUp() { tqPushMgrCleanUp(); }
...@@ -445,18 +444,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { ...@@ -445,18 +444,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType == TASK_EXEC__NONE) return 0; if (pTask->execType == TASK_EXEC__NONE) return 0;
pTask->exec.numOfRunners = parallel; pTask->exec.numOfRunners = parallel;
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) {
return -1;
}
for (int32_t i = 0; i < parallel; i++) { for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pReadHandle,
.meta = pTq->pVnodeMeta, .meta = pTq->pVnodeMeta,
}; };
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) {
return -1;
}
pTask->exec.runners[i].inputHandle = pReadHandle; pTask->exec.runners[i].inputHandle = pReadHandle;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor);
} }
return 0; return 0;
} }
...@@ -473,7 +473,10 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -473,7 +473,10 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
} }
tCoderClear(&decoder); tCoderClear(&decoder);
tqExpandTask(pTq, pTask, 8); if (tqExpandTask(pTq, pTask, 4) < 0) {
ASSERT(0);
}
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
return 0; return 0;
......
...@@ -22,10 +22,13 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -22,10 +22,13 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0; if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;
// exec // exec
if (pTask->execType == TASK_EXEC__EXEC) { if (pTask->execType != TASK_EXEC__NONE) {
ASSERT(workId < pTask->exec.numOfRunners); ASSERT(workId < pTask->exec.numOfRunners);
void* exec = pTask->exec.runners[workId].executor; void* exec = pTask->exec.runners[workId].executor;
pRes = taosArrayInit(0, sizeof(SSDataBlock)); pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) {
return -1;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
qSetStreamInput(exec, input, inputType); qSetStreamInput(exec, input, inputType);
while (1) { while (1) {
...@@ -79,7 +82,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -79,7 +82,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
} }
// dispatch // dispatch
if (pTask->dispatchType != TASK_DISPATCH__NONE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SStreamTaskExecReq req = { SStreamTaskExecReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
.taskId = pTask->taskId, .taskId = pTask->taskId,
...@@ -101,28 +104,54 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -101,28 +104,54 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
.code = 0, .code = 0,
.msgType = pTask->dispatchMsgType, .msgType = pTask->dispatchMsgType,
}; };
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
int32_t qType; int32_t qType;
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
qType = FETCH_QUEUE; qType = FETCH_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
qType = MERGE_QUEUE; qType = MERGE_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
qType = WRITE_QUEUE; qType = WRITE_QUEUE;
} else {
ASSERT(0);
}
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
((SMsgHead*)buf)->vgId = pTask->fixedEpDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO
} else { } else {
ASSERT(0); ASSERT(0);
} }
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SStreamTaskExecReq req = {
.streamId = pTask->streamId,
.taskId = pTask->taskId,
.data = pRes,
};
int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSStreamTaskExecReq(&abuf, &req);
SRpcMsg dispatchMsg = {
.pCont = buf,
.contLen = tlen,
.code = 0,
.msgType = pTask->dispatchMsgType,
};
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO
} else {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
} }
return 0; return 0;
} }
...@@ -168,7 +197,10 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { ...@@ -168,7 +197,10 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1;
if (pTask->execType == TASK_EXEC__EXEC) { if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
if (pTask->execType != TASK_EXEC__NONE) {
if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1; if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
} }
...@@ -203,7 +235,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { ...@@ -203,7 +235,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1;
if (pTask->execType == TASK_EXEC__EXEC) { if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
if (pTask->execType != TASK_EXEC__NONE) {
if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册