未验证 提交 09ab9cdc 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11070 from taosdata/feature/tq

refact stream shuffle dispatch
......@@ -801,7 +801,10 @@ typedef struct SVgroupInfo {
uint32_t hashBegin;
uint32_t hashEnd;
SEpSet epSet;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
union {
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
int32_t taskId; // used in stream
};
} SVgroupInfo;
typedef struct {
......
......@@ -244,44 +244,45 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessMgmtMsg, VND_VGID);
......
......@@ -119,6 +119,53 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup;
}
int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
ASSERT(taosArrayGetSize(pStream->tasks) == 1);
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosArrayPush(tasks, &pTask);
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// source
pTask->sourceType = TASK_SOURCE__MERGE;
// exec
pTask->execType = TASK_EXEC__NONE;
// sink
if (smaId != -1) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
}
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
}
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
......@@ -132,6 +179,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
ASSERT(totLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
bool hasExtraSink = false;
if (totLevel == 2) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
hasExtraSink = true;
mndAddSinkToStream(pMnode, pTrans, pStream, smaId);
}
for (int32_t level = 0; level < totLevel; level++) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
......@@ -164,9 +220,13 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
// only for inplace
pTask->sinkType = TASK_SINK__SHOW;
pTask->showSink.reserved = 0;
if (smaId != -1) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId;
if (!hasExtraSink) {
if (smaId != -1) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
}
}
} else {
pTask->sinkType = TASK_SINK__NONE;
......@@ -175,17 +235,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
// dispatch part
if (level == 0) {
pTask->dispatchType = TASK_DISPATCH__NONE;
// if inplace sink, no dispatcher
// if fixed ep, add fixed ep dispatcher
// if shuffle, add shuffle dispatcher
} else {
// add fixed ep dispatcher
int32_t lastLevel = level - 1;
ASSERT(lastLevel == 0);
if (hasExtraSink) lastLevel++;
SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
// one merge only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, lastLevel);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
pTask->dispatchType = TASK_DISPATCH__FIXED;
......@@ -222,18 +280,43 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
/*pTask->sinkType = TASK_SINK__NONE;*/
// dispatch part
pTask->dispatchType = TASK_DISPATCH__NONE;
#if 0
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
ASSERT(hasExtraSink);
/*pTask->dispatchType = TASK_DISPATCH__NONE;*/
#if 1
if (hasExtraSink) {
// add dispatcher
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pSdb, pDb);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pDb);
qDestroyQueryPlan(pPlan);
return -1;
// put taskId to useDbRsp
// TODO: optimize
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
break;
}
}
}
}
sdbRelease(pSdb, pDb);
#endif
// exec part
......
......@@ -508,7 +508,9 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msg, &req);
int32_t taskId = req.taskId;
int32_t taskId = req.taskId;
ASSERT(taskId);
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
ASSERT(pTask);
......
......@@ -31,12 +31,13 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
((SMsgHead*)buf)->vgId = 0;
req.taskId = pTask->inplaceDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
req.taskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
int32_t nodeId = 0;
// TODO fix tbname issue
char ctbName[TSDB_TABLE_FNAME_LEN + 22];
// all groupId must be the same in an array
......@@ -52,10 +53,12 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
// TODO: optimize search process
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(vgInfo);
int32_t nodeId = 0;
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
nodeId = pVgInfo->vgId;
req.taskId = pVgInfo->taskId;
*ppEpSet = &pVgInfo->epSet;
break;
}
......@@ -71,6 +74,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
pMsg->contLen = tlen;
pMsg->code = 0;
pMsg->msgType = pTask->dispatchMsgType;
/*pMsg->noResp = 1;*/
return 0;
}
......@@ -80,7 +84,7 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
while (1) {
pIter = taosHashIterate(data, pIter);
if (pIter == NULL) return 0;
SArray* pData = (SArray*)pIter;
SArray* pData = *(SArray**)pIter;
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet;
if (streamBuildDispatchMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
......@@ -98,7 +102,6 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;
// exec
// TODO: for shuffle dispatcher, merge data by groupId
if (pTask->execType != TASK_EXEC__NONE) {
ASSERT(workId < pTask->exec.numOfRunners);
void* exec = pTask->exec.runners[workId].executor;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册