提交 73b8f02a 编写于 作者: L Liu Jicong

fix(stream): set child id

上级 092a1378
...@@ -141,8 +141,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput); ...@@ -141,8 +141,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
#endif #endif
typedef struct { typedef struct {
int8_t parallelizable; char* qmsg;
char* qmsg;
// followings are not applicable to encoder and decoder // followings are not applicable to encoder and decoder
void* inputHandle; void* inputHandle;
void* executor; void* executor;
...@@ -267,7 +266,7 @@ struct SStreamTask { ...@@ -267,7 +266,7 @@ struct SStreamTask {
// void* ahandle; // void* ahandle;
}; };
SStreamTask* tNewSStreamTask(int64_t streamId, int32_t childId); SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask);
......
...@@ -197,7 +197,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p ...@@ -197,7 +197,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -237,7 +237,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p ...@@ -237,7 +237,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(pStream->fixedSinkVgId != 0); ASSERT(pStream->fixedSinkVgId != 0);
SArray* tasks = taosArrayGetP(pStream->tasks, 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -329,7 +329,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -329,7 +329,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask);
// source part // source part
pTask->sourceType = TASK_SOURCE__SCAN; pTask->sourceType = TASK_SOURCE__SCAN;
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
...@@ -378,14 +379,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -378,14 +379,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec part // exec part
pTask->execType = TASK_EXEC__PIPE; pTask->execType = TASK_EXEC__PIPE;
pTask->exec.parallelizable = 1;
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndAddTaskToTaskSet(taskOneLevel, pTask);
} }
} else { } else {
// merge plan // merge plan
...@@ -394,7 +393,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -394,7 +393,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// else, assign to vnode // else, assign to vnode
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask);
// source part, currently only support multi source // source part, currently only support multi source
pTask->sourceType = TASK_SOURCE__PIPE; pTask->sourceType = TASK_SOURCE__PIPE;
...@@ -456,7 +456,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -456,7 +456,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec part // exec part
pTask->execType = TASK_EXEC__MERGE; pTask->execType = TASK_EXEC__MERGE;
pTask->exec.parallelizable = 0;
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid); SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
ASSERT(pVgroup); ASSERT(pVgroup);
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
...@@ -465,12 +464,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -465,12 +464,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
return -1; return -1;
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
taosArrayPush(taskOneLevel, &pTask);
} }
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
} }
#if 0
if (totLevel == 2) { if (totLevel == 2) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
...@@ -481,7 +480,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -481,7 +480,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
// source part // source part
pTask->sourceType = TASK_SOURCE__MERGE; pTask->sourceType = TASK_SOURCE__MERGE;
...@@ -495,9 +494,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -495,9 +494,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec part // exec part
pTask->execType = TASK_EXEC__NONE; pTask->execType = TASK_EXEC__NONE;
pTask->exec.parallelizable = 0;
} }
} }
#endif
// free memory // free memory
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
......
...@@ -364,6 +364,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -364,6 +364,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols); tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
} }
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, pTq->pVnode->config.vgId);
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
......
...@@ -100,6 +100,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM ...@@ -100,6 +100,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.upstreamNodeId = pTask->nodeId, .upstreamNodeId = pTask->nodeId,
.blockNum = blockNum, .blockNum = blockNum,
}; };
qInfo("dispatch from task %d (child id %d)", pTask->taskId, pTask->childId);
req.data = taosArrayInit(blockNum, sizeof(void*)); req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
......
...@@ -16,14 +16,13 @@ ...@@ -16,14 +16,13 @@
#include "executor.h" #include "executor.h"
#include "tstream.h" #include "tstream.h"
SStreamTask* tNewSStreamTask(int64_t streamId, int32_t childId) { SStreamTask* tNewSStreamTask(int64_t streamId) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return NULL; return NULL;
} }
pTask->taskId = tGenIdPI32(); pTask->taskId = tGenIdPI32();
pTask->streamId = streamId; pTask->streamId = streamId;
pTask->childId = childId;
pTask->status = TASK_STATUS__IDLE; pTask->status = TASK_STATUS__IDLE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
...@@ -48,7 +47,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -48,7 +47,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
} }
...@@ -96,7 +94,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -96,7 +94,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册