未验证 提交 8020e6cb 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10866 from taosdata/feature/tq

assign task to vg
...@@ -1129,10 +1129,10 @@ typedef struct { ...@@ -1129,10 +1129,10 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char outputTbName[TSDB_TABLE_NAME_LEN];
int8_t igExists; int8_t igExists;
char* sql; char* sql;
char* physicalPlan; char* ast;
char* logicalPlan;
} SCMCreateStreamReq; } SCMCreateStreamReq;
typedef struct { typedef struct {
...@@ -2274,13 +2274,23 @@ enum { ...@@ -2274,13 +2274,23 @@ enum {
STREAM_TASK_STATUS__STOP, STREAM_TASK_STATUS__STOP,
}; };
typedef struct {
void* inputHandle;
void** executor;
} SStreamTaskParRunner;
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int32_t level; int32_t level;
int8_t status; int8_t status;
int8_t pipeEnd;
int8_t parallel;
SEpSet NextOpEp;
char* qmsg; char* qmsg;
void* executor; // not applied to encoder and decoder
SStreamTaskParRunner runner;
// void* executor;
// void* stateStore; // void* stateStore;
// storage handle // storage handle
} SStreamTask; } SStreamTask;
......
...@@ -456,6 +456,94 @@ _return: ...@@ -456,6 +456,94 @@ _return:
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
SQuery* pQueryNode = NULL;
char* astStr = NULL;
int32_t sqlLen;
terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || streamName == NULL || sql == NULL) {
tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
sqlLen = strlen(sql);
if (strlen(streamName) >= TSDB_TABLE_NAME_LEN) {
tscError("stream name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return;
}
tscDebug("start to create stream: %s", streamName);
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return);
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
/*printf("%s\n", pStr);*/
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
strcpy(name.dbname, pRequest->pDb);
strcpy(name.tname, streamName);
SCMCreateStreamReq req = {
.igExists = 1,
.ast = astStr,
.sql = (char*)sql,
};
tNameExtractFullName(&name, req.name);
strcpy(req.outputTbName, tbName);
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
void* buf = malloc(tlen);
if (buf == NULL) {
goto _return;
}
tSerializeSCMCreateStreamReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
pRequest->type = TDMT_MND_CREATE_STREAM;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
_return:
tfree(astStr);
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
}
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
...@@ -481,7 +569,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -481,7 +569,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
goto _return; goto _return;
} }
tscDebug("start to create topic, %s", topicName); tscDebug("start to create topic: %s", topicName);
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return); CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
...@@ -498,7 +586,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -498,7 +586,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
SCMCreateTopicReq req = { SCMCreateTopicReq req = {
.igExists = 1, .igExists = 1,
.ast = (char*)astStr, .ast = astStr,
.sql = (char*)sql, .sql = (char*)sql,
}; };
tNameExtractFullName(&name, req.name); tNameExtractFullName(&name, req.name);
...@@ -528,6 +616,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -528,6 +616,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
_return: _return:
tfree(astStr);
qDestroyQuery(pQueryNode); qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/ /*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/ /*destroySendMsgInfo(sendInfo);*/
......
...@@ -2469,7 +2469,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq ...@@ -2469,7 +2469,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq
int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) { int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) {
if (tStartDecode(decoder) < 0) return -1; if (tStartDecode(decoder) < 0) return -1;
if (tDecodeI32(decoder, &pReq->num) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
TCODER_MALLOC(pReq->offsets, SMqOffset*, pReq->num * sizeof(SMqOffset), decoder); TCODER_MALLOC(pReq->offsets, SMqOffset *, pReq->num * sizeof(SMqOffset), decoder);
if (pReq->offsets == NULL) return -1; if (pReq->offsets == NULL) return -1;
for (int32_t i = 0; i < pReq->num; i++) { for (int32_t i = 0; i < pReq->num; i++) {
tDecodeSMqOffset(decoder, &pReq->offsets[i]); tDecodeSMqOffset(decoder, &pReq->offsets[i]);
...@@ -2655,15 +2655,22 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { ...@@ -2655,15 +2655,22 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
} }
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) { int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
int32_t sqlLen = 0;
int32_t astLen = 0;
if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql);
if (pReq->ast != NULL) astLen = (int32_t)strlen(pReq->ast);
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -2672,15 +2679,30 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -2672,15 +2679,30 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
} }
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) { int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
int32_t sqlLen = 0;
int32_t astLen = 0;
SCoder decoder = {0}; SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->outputTbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->physicalPlan) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->logicalPlan) < 0) return -1;
if (sqlLen > 0) {
pReq->sql = calloc(1, sqlLen + 1);
if (pReq->sql == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
}
if (astLen > 0) {
pReq->ast = calloc(1, astLen + 1);
if (pReq->ast == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tCoderClear(&decoder);
...@@ -2689,8 +2711,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -2689,8 +2711,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
tfree(pReq->sql); tfree(pReq->sql);
tfree(pReq->physicalPlan); tfree(pReq->ast);
tfree(pReq->logicalPlan);
} }
int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
...@@ -2699,6 +2720,7 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { ...@@ -2699,6 +2720,7 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
...@@ -2710,6 +2732,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { ...@@ -2710,6 +2732,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
......
...@@ -28,7 +28,7 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); ...@@ -28,7 +28,7 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
...@@ -31,6 +31,53 @@ ...@@ -31,6 +31,53 @@
#include "tname.h" #include "tname.h"
#include "tuuid.h" #include "tuuid.h"
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet) {
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = malloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_SND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
return 0;
}
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet);
return 0;
}
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
const SSnodeObj* pSnode) {
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL; SVgObj* pVgroup = NULL;
...@@ -44,7 +91,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -44,7 +91,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
int32_t msgLen;
for (int32_t level = 0; level < totLevel; level++) { for (int32_t level = 0; level < totLevel; level++) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
...@@ -67,43 +113,16 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -67,43 +113,16 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// send to vnode // send to vnode
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
// TODO: set to
plan->execNode.nodeId = pVgroup->vgId; pTask->parallel = 4;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
} }
} else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) {
// duplicatable // duplicatable
...@@ -113,88 +132,26 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -113,88 +132,26 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// if has snode, set to shared thread num in snode // if has snode, set to shared thread num in snode
parallel = SND_SHARED_THREAD_NUM; parallel = SND_SHARED_THREAD_NUM;
for (int32_t i = 0; i < parallel; i++) {
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
// TODO:get snode id and ep
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
taosArrayPush(taskOneLevel, pTask);
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_SND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
}
} else {
// not duplicatable
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
pTask->parallel = parallel;
// TODO:get snode id and ep // TODO:get snode id and ep
plan->execNode.nodeId = pVgroup->vgId; if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
} else {
// not duplicatable
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
SCoder encoder; // TODO: get snode
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
tEncodeSStreamTask(&encoder, pTask); sdbRelease(pSdb, pVgroup);
int32_t tlen = sizeof(SMsgHead) + encoder.pos; qDestroyQueryPlan(pPlan);
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_SND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask);
} }
taosArrayPush(pStream->tasks, taskOneLevel); taosArrayPush(pStream->tasks, taskOneLevel);
} }
......
...@@ -220,8 +220,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -220,8 +220,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.physicalPlan = pCreate->physicalPlan; streamObj.physicalPlan = "";
streamObj.logicalPlan = pCreate->logicalPlan; streamObj.logicalPlan = "";
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
......
...@@ -433,12 +433,12 @@ ALLOC_VGROUP_OVER: ...@@ -433,12 +433,12 @@ ALLOC_VGROUP_OVER:
return code; return code;
} }
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
SEpSet epset = {0}; SEpSet epset = {0};
for (int32_t v = 0; v < pVgroup->replica; ++v) { for (int32_t v = 0; v < pVgroup->replica; ++v) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) continue; if (pDnode == NULL) continue;
if (pVgid->role == TAOS_SYNC_STATE_LEADER) { if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
......
...@@ -57,7 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) { ...@@ -57,7 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) {
} }
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
pTask->executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); pTask->runner.executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL);
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册