未验证 提交 26c13882 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14201 from taosdata/feature/stream

refactor(stream): seperate stream scheduling and persistence
...@@ -27,12 +27,10 @@ void mndCleanupScheduler(SMnode* pMnode); ...@@ -27,12 +27,10 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark); int64_t watermark);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -97,37 +97,7 @@ END: ...@@ -97,37 +97,7 @@ END:
return terrno; return terrno;
} }
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeSStreamTask(&encoder, pTask);
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tEncoderClear(&encoder);
void* buf = taosMemoryCalloc(1, tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, size);
tEncodeSStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = tlen;
action.msgType = type;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
return -1;
}
return 0;
}
int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
// sink // sink
if (pStream->smaId != 0) { if (pStream->smaId != 0) {
...@@ -142,7 +112,7 @@ int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SS ...@@ -142,7 +112,7 @@ int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SS
return 0; return 0;
} }
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
pTask->sinkType = TASK_SINK__NONE; pTask->sinkType = TASK_SINK__NONE;
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
...@@ -187,7 +157,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* ...@@ -187,7 +157,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
return 0; return 0;
} }
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen; int32_t msgLen;
pTask->nodeId = pVgroup->vgId; pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
...@@ -196,11 +166,11 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS ...@@ -196,11 +166,11 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
plan->execNode.epSet = pTask->epSet; plan->execNode.epSet = pTask->epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
ASSERT(0);
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE); ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE);
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
return 0; return 0;
} }
...@@ -212,8 +182,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) { ...@@ -212,8 +182,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
return pObj; return pObj;
} }
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
const SSnodeObj* pSnode) {
int32_t msgLen; int32_t msgLen;
pTask->nodeId = SNODE_HANDLE; pTask->nodeId = SNODE_HANDLE;
...@@ -223,10 +192,10 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, ...@@ -223,10 +192,10 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
plan->execNode.epSet = pTask->epSet; plan->execNode.epSet = pTask->epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
ASSERT(0);
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, SNODE_HANDLE);
return 0; return 0;
} }
...@@ -245,7 +214,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { ...@@ -245,7 +214,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup; return pVgroup;
} }
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL; void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0);
...@@ -262,6 +231,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb ...@@ -262,6 +231,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) { if (pTask == NULL) {
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -290,13 +260,11 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb ...@@ -290,13 +260,11 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
// dispatch // dispatch
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
} }
return 0; return 0;
} }
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
ASSERT(pStream->fixedSinkVgId != 0); ASSERT(pStream->fixedSinkVgId != 0);
SArray* tasks = taosArrayGetP(pStream->tasks, 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
...@@ -337,13 +305,10 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* ...@@ -337,13 +305,10 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
// dispatch // dispatch
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
return 0; return 0;
} }
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
...@@ -368,9 +333,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -368,9 +333,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// add extra sink // add extra sink
hasExtraSink = true; hasExtraSink = true;
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream); if (mndAddShuffleSinkTasksToStream(pMnode, pStream) < 0) {
// TODO free
return -1;
}
} else { } else {
mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream); if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) {
// TODO free
return -1;
}
} }
} }
...@@ -386,6 +357,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -386,6 +357,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
pInnerTask = tNewSStreamTask(pStream->uid); pInnerTask = tNewSStreamTask(pStream->uid);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qDestroyQueryPlan(pPlan);
return -1;
}
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
...@@ -397,7 +373,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -397,7 +373,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pInnerTask->triggerParam = pStream->triggerParam; pInnerTask->triggerParam = pStream->triggerParam;
// dispatch // dispatch
if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pInnerTask) < 0) { if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
...@@ -409,14 +385,13 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -409,14 +385,13 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
if (pSnode == NULL) { if (pSnode == NULL) {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
} else { } else {
if (mndAssignTaskToSnode(pMnode, pTrans, pInnerTask, plan, pSnode) < 0) { if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) {
ASSERT(0);
sdbRelease(pSdb, pSnode); sdbRelease(pSdb, pSnode);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
...@@ -424,7 +399,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -424,7 +399,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} }
} else { } else {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
...@@ -450,6 +425,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -450,6 +425,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
mndAddTaskToTaskSet(taskSourceLevel, pTask); mndAddTaskToTaskSet(taskSourceLevel, pTask);
// source // source
...@@ -466,7 +447,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -466,7 +447,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// exec // exec
pTask->execType = TASK_EXEC__PIPE; pTask->execType = TASK_EXEC__PIPE;
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
...@@ -507,6 +488,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -507,6 +488,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
mndAddTaskToTaskSet(taskOneLevel, pTask); mndAddTaskToTaskSet(taskOneLevel, pTask);
// source // source
...@@ -517,14 +503,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -517,14 +503,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// sink or dispatch // sink or dispatch
if (hasExtraSink) { if (hasExtraSink) {
mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask); mndAddDispatcherToInnerTask(pMnode, pStream, pTask);
} else { } else {
mndAddSinkToTask(pMnode, pTrans, pStream, pTask); mndAddSinkToTask(pMnode, pStream, pTask);
} }
// exec // exec
pTask->execType = TASK_EXEC__PIPE; pTask->execType = TASK_EXEC__PIPE;
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
......
...@@ -533,7 +533,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -533,7 +533,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
#if 0 #if 0
smaObj.timezone = pCreate->timezone; smaObj.timezone = pCreate->timezone;
#endif #endif
smaObj.timezone = tsTimezone; // use timezone of server smaObj.timezone = tsTimezone; // use timezone of server
smaObj.interval = pCreate->interval; smaObj.interval = pCreate->interval;
smaObj.offset = pCreate->offset; smaObj.offset = pCreate->offset;
smaObj.sliding = pCreate->sliding; smaObj.sliding = pCreate->sliding;
...@@ -623,7 +623,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -623,7 +623,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; // if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; if (mndScheduleStream(pMnode, &streamObj) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......
...@@ -320,70 +320,72 @@ FAIL: ...@@ -320,70 +320,72 @@ FAIL:
return 0; return 0;
} }
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); SEncoder encoder;
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { tEncoderInit(&encoder, NULL, 0);
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); tEncodeSStreamTask(&encoder, pTask);
mndTransDrop(pTrans); int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tEncoderClear(&encoder);
void *buf = taosMemoryCalloc(1, tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); ((SMsgHead *)buf)->vgId = htonl(pTask->nodeId);
return 0; void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
} tEncoderInit(&encoder, abuf, size);
tEncodeSStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { STransAction action = {0};
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { action.pCont = buf;
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); action.contLen = tlen;
mndTransDrop(pTrans); action.msgType = TDMT_STREAM_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
return -1; return -1;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
return 0; return 0;
} }
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SNode *pAst = NULL; int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
if (nodesStringToNode(ast, &pAst) < 0) { SArray *pLevel = taosArrayGetP(pStream->tasks, i);
return -1; int32_t sz = taosArrayGetSize(pLevel);
} for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema) != 0) { if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
nodesDestroyNode(pAst); return -1;
return -1; }
} }
// free
nodesDestroyNode(pAst);
#if 0
printf("|");
for (int i = 0; i < pStream->outputSchema.nCols; i++) {
printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name);
} }
printf("\n=======================================================\n"); return 0;
}
#endif
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, pStream->trigger, pStream->watermark, &pStream->physicalPlan)) { int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); if (mndPersistStreamTasks(pMnode, pTrans, pStream) < 0) {
return -1; return -1;
} }
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
return -1; return -1;
} }
mDebug("trans:%d, used to create stream:%s", pTrans->id, pStream->name); sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
return 0; return 0;
} }
...@@ -503,64 +505,6 @@ static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pS ...@@ -503,64 +505,6 @@ static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pS
return 0; return 0;
} }
static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
mDebug("stream:%s to create", pCreate->name);
SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(streamObj.targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.sourceDbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
// TODO
streamObj.fixedSinkVgId = 0;
streamObj.smaId = 0;
streamObj.trigger = pCreate->triggerType;
streamObj.watermark = pCreate->watermark;
streamObj.triggerParam = pCreate->maxDelay;
if (streamObj.targetSTbName[0]) {
pDb = mndAcquireDbByStb(pMnode, streamObj.targetSTbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
...@@ -631,7 +575,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -631,7 +575,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
} }
// schedule stream task for stream obj // schedule stream task for stream obj
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { if (mndScheduleStream(pMnode, &streamObj) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr()); mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
...@@ -653,8 +597,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -653,8 +597,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
/*code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);*/
/*if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;*/
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
......
...@@ -146,8 +146,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ ...@@ -146,8 +146,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
int32_t* pNumOfRows) { int32_t* pNumOfRows) {
*pUid = 0; *pUid = 0;
// TODO set to real sversion // TODO: cache multiple schema
/*int32_t sversion = 1;*/
int32_t sversion = htonl(pHandle->pBlock->sversion); int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion || if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
pHandle->cachedSchemaSuid != pHandle->msgIter.suid) { pHandle->cachedSchemaSuid != pHandle->msgIter.suid) {
...@@ -161,7 +160,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ ...@@ -161,7 +160,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
return -1; return -1;
} }
// this interface use suid instead of uid
if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper); if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper);
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true);
if (pHandle->pSchemaWrapper == NULL) { if (pHandle->pSchemaWrapper == NULL) {
......
...@@ -40,15 +40,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -40,15 +40,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid; pInfo->assignBlockUid = assignUid;
// no need to check // TODO: if a block was set but not consumed,
#if 0 // prevent setting a different type of block
if (pInfo->blockType == 0) {
pInfo->blockType = type;
} else if (pInfo->blockType != type) {
ASSERT(0);
return TSDB_CODE_QRY_APP_ERROR;
}
#endif
pInfo->blockType = type; pInfo->blockType = type;
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
......
...@@ -1321,7 +1321,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { ...@@ -1321,7 +1321,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
// todo move to the initialization function // todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock}; SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1); code = filterSetDataFromSlotId(filter, &param1);
...@@ -2048,7 +2048,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo ...@@ -2048,7 +2048,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
SArray* pColList) { SArray* pColList) {
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes); blockDataCleanup(pRes);
// blockDataEnsureCapacity(pRes, numOfRows); // blockDataEnsureCapacity(pRes, numOfRows);
blockCompressDecode(pRes, numOfOutput, numOfRows, pData); blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
} else { // extract data according to pColList } else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList)); ASSERT(numOfOutput == taosArrayGetSize(pColList));
...@@ -2402,14 +2402,14 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -2402,14 +2402,14 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pInfo->seqLoadData = false; pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter; pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pOperator->name = "ExchangeOperator"; pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
...@@ -4365,6 +4365,62 @@ _error: ...@@ -4365,6 +4365,62 @@ _error:
return NULL; return NULL;
} }
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
*ppNode = (STableScanPhysiNode*)pNode;
return 0;
} else {
ASSERT(0);
terrno = TSDB_CODE_QRY_APP_ERROR;
return -1;
}
} else {
if (LIST_LENGTH(pNode->pChildren) != 1) {
ASSERT(0);
terrno = TSDB_CODE_QRY_APP_ERROR;
return -1;
}
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
return extractTableScanNode(pChildNode, ppNode);
}
return -1;
}
int32_t doRebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator");
return TSDB_CODE_QRY_APP_ERROR;
}
if (pOperator->numOfDownstream > 1) {
qError("join not supported for stream block scan");
return TSDB_CODE_QRY_APP_ERROR;
}
return doRebuildReader(pOperator->pDownstream[0], plan, pHandle);
} else {
SStreamBlockScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
STableScanPhysiNode* pNode = NULL;
if (extractTableScanNode(plan->pNode, &pNode) < 0) {
ASSERT(0);
}
STableListInfo info = {0};
pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, 0, 0);
if (pTableScanInfo->dataReader == NULL) {
ASSERT(0);
qError("failed to create data reader");
return TSDB_CODE_QRY_APP_ERROR;
}
}
return 0;
}
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) { int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
char* pCurrent = NULL; char* pCurrent = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册