diff --git a/example/src/tstream.c b/example/src/tstream.c index 62d94b041d5b304e2d4f449436fcf9896a97e395..6976d9e398726b5fa44917476cd11e601c01f1d0 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -63,7 +63,7 @@ int32_t init_env() { } int32_t create_stream() { - printf("create topic\n"); + printf("create stream\n"); TAOS_RES* pRes; TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -77,7 +77,9 @@ int32_t create_stream() { } taos_free_result(pRes); - const char* sql = "select ts,k from tu1"; + /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ + const char* sql = "select min(k), max(k), sum(k) from st1"; + /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ pRes = tmq_create_stream(pConn, "stream1", "out1", sql); if (taos_errno(pRes) != 0) { printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 583496a4c673b5364f462d65d644d7a5524c9b1d..812025e8e46a4ed747b270d12ca3020558270e03 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -23,8 +23,8 @@ #include "tencode.h" #include "thash.h" #include "tlist.h" -#include "trow.h" #include "tname.h" +#include "trow.h" #include "tuuid.h" #ifdef __cplusplus @@ -472,10 +472,9 @@ typedef struct { int32_t code; } SQueryTableRsp; -int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); - -int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); +int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); +int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; @@ -888,14 +887,14 @@ typedef struct { } SRetrieveTableRsp; typedef struct { - int64_t handle; - int64_t useconds; - int8_t completed; // all results are returned to client - int8_t precision; - int8_t compressed; - int32_t compLen; - int32_t numOfRows; - char data[]; + int64_t handle; + int64_t useconds; + int8_t completed; // all results are returned to client + int8_t precision; + int8_t compressed; + int32_t compLen; + int32_t numOfRows; + char data[]; } SRetrieveMetaTableRsp; typedef struct { @@ -1425,12 +1424,11 @@ int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); typedef struct { - SArray* rspList; // SArray + SArray* rspList; // SArray } SVCreateTbBatchRsp; -int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); -int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); - +int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); +int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); typedef struct { int64_t ver; @@ -2312,6 +2310,11 @@ enum { STREAM_TASK_STATUS__STOP, }; +enum { + STREAM_NEXT_OP_DST__VND = 1, + STREAM_NEXT_OP_DST__SND, +}; + typedef struct { void* inputHandle; void* executor; @@ -2326,6 +2329,7 @@ typedef struct { int8_t pipeSink; int8_t numOfRunners; int8_t parallelizable; + int8_t nextOpDst; // vnode or snode SEpSet NextOpEp; char* qmsg; // not applied to encoder and decoder @@ -2341,6 +2345,8 @@ static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) return NULL; } pTask->taskId = tGenIdPI32(); + pTask->streamId = streamId; + pTask->level = level; pTask->status = STREAM_TASK_STATUS__RUNNING; pTask->qmsg = NULL; return pTask; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 73a78131dc1cb6d81e85c3278328866d05ecd62a..ad8a83909592e2e655b17a61dd0359c6055dba2b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -191,6 +191,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 841824a7c75c4d5c9f2fb858e7690f29d266ca6b..506e017deb1b21f6a89632d67de319c1cf56323e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -132,6 +132,9 @@ bool tsdbForceKeepFile = false; int32_t tsDiskCfgNum = 0; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; +// stream scheduler +bool tsStreamSchedV = true; + /* * minimum scale for whole system, millisecond by default * for TSDB_TIME_PRECISION_MILLI: 86400000L @@ -585,4 +588,4 @@ void taosCfgDynamicOptions(const char *option, const char *value) { taosResetLog(); cfgDumpCfg(tsCfg, 1, false); } -} \ No newline at end of file +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1ea00566f483e87340d8ff2a9816af1b4901e2fb..b2fb92600b947a13263c19c338956bfb24eb1712 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -313,12 +313,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); } - if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { + if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { SRSmaParam *param = pReq->stbCfg.pRSmaParam; tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->nFuncIds); - for(int8_t i=0; i< param->nFuncIds; ++i) { + for (int8_t i = 0; i < param->nFuncIds; ++i) { tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); } tlen += taosEncodeFixedI64(buf, param->delay); @@ -340,12 +340,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); } - if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { + if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { SRSmaParam *param = pReq->stbCfg.pRSmaParam; tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->nFuncIds); - for(int8_t i=0; i< param->nFuncIds; ++i) { + for (int8_t i = 0; i < param->nFuncIds; ++i) { tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); } tlen += taosEncodeFixedI64(buf, param->delay); @@ -385,7 +385,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); } buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); - if(pReq->stbCfg.nBSmaCols > 0) { + if (pReq->stbCfg.nBSmaCols > 0) { pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); @@ -393,14 +393,14 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { } else { pReq->stbCfg.pBSmaCols = NULL; } - if(pReq->rollup) { + if (pReq->rollup) { pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); SRSmaParam *param = pReq->stbCfg.pRSmaParam; - buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor); + buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor); buf = taosDecodeFixedI8(buf, ¶m->delayUnit); buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); - if(param->nFuncIds > 0) { - for (int8_t i = 0; i< param->nFuncIds; ++i) { + if (param->nFuncIds > 0) { + for (int8_t i = 0; i < param->nFuncIds; ++i) { buf = taosDecodeFixedI32(buf, param->pFuncIds + i); } } else { @@ -425,7 +425,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); } buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); - if(pReq->stbCfg.nBSmaCols > 0) { + if (pReq->stbCfg.nBSmaCols > 0) { pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); @@ -433,14 +433,14 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { } else { pReq->stbCfg.pBSmaCols = NULL; } - if(pReq->rollup) { + if (pReq->rollup) { pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); SRSmaParam *param = pReq->stbCfg.pRSmaParam; - buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor); + buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor); buf = taosDecodeFixedI8(buf, ¶m->delayUnit); buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); - if(param->nFuncIds > 0) { - for (int8_t i = 0; i< param->nFuncIds; ++i) { + if (param->nFuncIds > 0) { + for (int8_t i = 0; i < param->nFuncIds; ++i) { buf = taosDecodeFixedI32(buf, param->pFuncIds + i); } } else { @@ -2709,7 +2709,7 @@ int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2736,13 +2736,13 @@ int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchR if (tStartEncode(&encoder) < 0) return -1; if (pRsp->rspList) { int32_t num = taosArrayGetSize(pRsp->rspList); - if (tEncodeI32(&encoder, num) < 0) return -1; + if (tEncodeI32(&encoder, num) < 0) return -1; for (int32_t i = 0; i < num; ++i) { SVCreateTbRsp *rsp = taosArrayGet(pRsp->rspList, i); - if (tEncodeI32(&encoder, rsp->code) < 0) return -1; + if (tEncodeI32(&encoder, rsp->code) < 0) return -1; } } else { - if (tEncodeI32(&encoder, 0) < 0) return -1; + if (tEncodeI32(&encoder, 0) < 0) return -1; } tEndEncode(&encoder); @@ -2752,7 +2752,7 @@ int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchR } int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) { - SCoder decoder = {0}; + SCoder decoder = {0}; int32_t num = 0; tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); @@ -2775,7 +2775,6 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc return 0; } - int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tlen = 0; @@ -2824,7 +2823,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; + if (tEncodeI32(&encoder, sqlLen) < 0) return -1; + if (tEncodeI32(&encoder, astLen) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; @@ -2872,30 +2872,36 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { } int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { - if (tStartEncode(pEncoder) < 0) return -1; + /*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->pipeSource) < 0) return -1; if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1; // if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; - tEndEncode(pEncoder); + /*tEndEncode(pEncoder);*/ return pEncoder->pos; } int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { - if (tStartDecode(pDecoder) < 0) return -1; + /*if (tStartDecode(pDecoder) < 0) return -1;*/ if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->pipeSource) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1; // if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; - tEndDecode(pDecoder); + /*tEndDecode(pDecoder);*/ return 0; } diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index 56a580ed939df45a625a4bcb54bcca611fb9d797..fee568d20b0d439367d2953a6dca84ccdc9fb997 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -142,6 +142,8 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index d4af82b382fb5c1ee42d0b6534961f40eb7232dd..7bbf730744d09438b3a72a12ee23bb4255543fd4 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -277,6 +277,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b95574ea4173837bfd0ad9e68b489255c6ec425d..8ccfd6be5d4cf170d69ffacec46574f4c5abd5cf 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -32,20 +32,23 @@ #include "tname.h" #include "tuuid.h" -int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type) { +extern bool tsStreamSchedV; + +int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncodeSStreamTask(&encoder, pTask); - int32_t tlen = sizeof(SMsgHead) + encoder.pos; + int32_t size = encoder.pos; + int32_t tlen = sizeof(SMsgHead) + size; tCoderClear(&encoder); void* buf = malloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + ((SMsgHead*)buf)->streamTaskId = htonl(nodeId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER); tEncodeSStreamTask(&encoder, pTask); tCoderClear(&encoder); @@ -70,7 +73,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY); + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); return 0; } @@ -90,7 +93,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY); + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0); return 0; } @@ -106,6 +109,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); + int32_t lastUsedVgId = 0; for (int32_t level = 0; level < totLevel; level++) { SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); @@ -113,9 +117,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t opNum = LIST_LENGTH(inner->pNodeList); ASSERT(opNum == 1); - SSubplan* plan = nodesListGetNode(inner->pNodeList, level); + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); if (level == 0) { - ASSERT(plan->type == SUBPLAN_TYPE_SCAN); + ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); void* pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); @@ -125,11 +129,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { continue; } + lastUsedVgId = pVgroup->vgId; pStream->vgNum++; // send to vnode SStreamTask* pTask = streamTaskNew(pStream->uid, level); + pTask->pipeSource = 1; pTask->pipeSink = level == totLevel - 1 ? 1 : 0; + pTask->parallelizable = 1; // TODO: set to if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); @@ -140,19 +147,35 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } } else { SStreamTask* pTask = streamTaskNew(pStream->uid, level); + pTask->pipeSource = 0; pTask->pipeSink = level == totLevel - 1 ? 1 : 0; - SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); - if (pSnode != NULL) { - if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { - sdbRelease(pSdb, pSnode); + pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN; + pTask->nextOpDst = STREAM_NEXT_OP_DST__VND; + + if (tsStreamSchedV) { + ASSERT(lastUsedVgId != 0); + SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId); + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) { + sdbRelease(pSdb, pVg); qDestroyQueryPlan(pPlan); return -1; } - sdbRelease(pMnode->pSdb, pSnode); + sdbRelease(pSdb, pVg); } else { - // TODO: assign to one vg - ASSERT(0); + SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); + if (pSnode != NULL) { + if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { + sdbRelease(pSdb, pSnode); + qDestroyQueryPlan(pPlan); + return -1; + } + sdbRelease(pMnode->pSdb, pSnode); + } else { + // TODO: assign to one vg + ASSERT(0); + } } + taosArrayPush(taskOneLevel, pTask); } taosArrayPush(pStream->tasks, taskOneLevel); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 99aded0292e973b4c457eae5c65dd27bc13db626..2ca4e10f6845f265391d6d0f180feb65ce4a36be 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -21,6 +21,7 @@ #include "mndScheduler.h" #include "mndShow.h" #include "mndStb.h" +#include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -33,6 +34,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SNodeMsg *pReq); +static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp); /*static int32_t mndProcessDropStreamReq(SNodeMsg *pReq);*/ /*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/ static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq); @@ -50,6 +52,8 @@ int32_t mndInitStream(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndStreamActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); + mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp); + mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp); /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ @@ -68,7 +72,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); - if (tEncodeSStreamObj(NULL, pStream) < 0) { + if (tEncodeSStreamObj(&encoder, pStream) < 0) { tCoderClear(&encoder); goto STREAM_ENCODE_OVER; } @@ -83,7 +87,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { if (buf == NULL) goto STREAM_ENCODE_OVER; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); - if (tEncodeSStreamObj(NULL, pStream) < 0) { + if (tEncodeSStreamObj(&encoder, pStream) < 0) { tCoderClear(&encoder); goto STREAM_ENCODE_OVER; } @@ -135,7 +139,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); SCoder decoder; - tCoderInit(&decoder, TD_LITTLE_ENDIAN, NULL, 0, TD_DECODER); + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, tlen + 1, TD_DECODER); if (tDecodeSStreamObj(&decoder, pStream) < 0) { goto STREAM_DECODE_OVER; } @@ -191,6 +195,11 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pSdb, pStream); } +static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp) { + mndTransProcessRsp(pRsp); + return 0; +} + static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) { SName name = {0}; tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -209,6 +218,33 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } +static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) { + if (NULL == pCreate->ast) { + return TSDB_CODE_SUCCESS; + } + + SNode *pAst = NULL; + int32_t code = nodesStringToNode(pCreate->ast, &pAst); + + SQueryPlan *pPlan = NULL; + if (TSDB_CODE_SUCCESS == code) { + SPlanContext cxt = { + .pAstRoot = pAst, + .topicQuery = false, + .streamQuery = true, + }; + code = qCreateQueryPlan(&cxt, &pPlan, NULL); + } + + if (TSDB_CODE_SUCCESS == code) { + code = nodesNodeToString(pPlan, false, pStr, NULL); + } + nodesDestroyNode(pAst); + nodesDestroyNode(pPlan); + terrno = code; + return code; +} + static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; @@ -220,8 +256,13 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; - streamObj.physicalPlan = ""; - streamObj.logicalPlan = ""; + /*streamObj.physicalPlan = "";*/ + streamObj.logicalPlan = "not implemented"; + + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { + mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); + return -1; + } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index bd0fd0e612affb98f3620b3f86b595ea633c0863..95dce5d8f1dc0e7da2e36db5839000fdff1409aa 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -236,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { return 0; } -static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) { +static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) { if (NULL == pCreate->ast) { return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 4d4bb12a21fdbed28c6dbc7380bd9ae491a77595..deb3cae617e049032c0a45dd679ac2dfe5e689a0 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -167,6 +167,7 @@ struct STQ { STqMetaStore* tqMeta; STqPushMgr* tqPushMgr; SHashObj* pStreamTasks; + SVnode* pVnode; SWal* pWal; SMeta* pVnodeMeta; }; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index ed9aad927771d482c519aac8cc9218f67518c607..78ff9f1062b6300beed773f415c6707d0c249acb 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -82,12 +82,6 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); -int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); -int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq); -int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); -int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq); -void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp); - #define vFatal(...) \ do { \ if (vDebugFlag & DEBUG_FATAL) { \ @@ -177,19 +171,20 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, + SMemAllocatorFactory* allocFac); void tqClose(STQ*); // required by vnode -int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); +int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); - int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3af79ca4619b0445ffab23ee9245d88e1c16dfb7..25fe9637991f07e416f545247a6f4f2a220d5d8c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -17,11 +17,14 @@ #include "tqInt.h" #include "tqMetaStore.h" +void tqDebugShowSSData(SArray* dataBlocks); + int32_t tqInit() { return tqPushMgrInit(); } void tqCleanUp() { tqPushMgrCleanUp(); } -STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, + SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -29,6 +32,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S } pTq->path = strdup(path); pTq->tqConfig = tqConfig; + pTq->pVnode = pVnode; pTq->pWal = pWal; pTq->pVnodeMeta = pVnodeMeta; #if 0 @@ -68,46 +72,19 @@ void tqClose(STQ* pTq) { // TODO } -int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { +int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) { if (msgType != TDMT_VND_SUBMIT) return 0; - - void* pIter = NULL; - - while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); - if (pIter == NULL) break; - SStreamTask* pTask = (SStreamTask*)pIter; - if (!pTask->pipeSource) continue; - - int32_t workerId = 0; - void* exec = pTask->runner[workerId].executor; - qSetStreamInput(exec, msg, STREAM_DATA_TYPE_SUBMIT_BLOCK); - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - while (1) { - SSDataBlock* output; - uint64_t ts; - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(false); - } - if (output == NULL) { - break; - } - taosArrayPush(pRes, output); - } - if (pTask->pipeSink) { - // write back - } else { - int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - return -1; - } - void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead)); - tEncodeDataBlocks(abuf, pRes); - // serialize - // to next level - } + void* data = malloc(msgLen); + if (data == NULL) { + return -1; } + memcpy(data, msg, msgLen); + SRpcMsg req = { + .msgType = TDMT_VND_STREAM_TRIGGER, + .pCont = data, + .contLen = msgLen, + }; + tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req); #if 0 void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL); @@ -483,7 +460,9 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { } SCoder decoder; tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER); - tDecodeSStreamTask(&decoder, pTask); + if (tDecodeSStreamTask(&decoder, pTask) < 0) { + ASSERT(0); + } tCoderClear(&decoder); tqExpandTask(pTq, pTask, 8); @@ -560,7 +539,11 @@ void tqDebugShowSSData(SArray* dataBlocks) { break; case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_UINT: - printf(" %15u |", *(uint32_t*)var); + printf(" %15d |", *(int32_t*)var); + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + printf(" %15ld |", *(int64_t*)var); break; } } @@ -569,6 +552,62 @@ void tqDebugShowSSData(SArray* dataBlocks) { } } +int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { + void* pIter = NULL; + + while (1) { + pIter = taosHashIterate(pTq->pStreamTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = (SStreamTask*)pIter; + if (!pTask->pipeSource) continue; + + int32_t workerId = 0; + void* exec = pTask->runner[workerId].executor; + qSetStreamInput(exec, data, STREAM_DATA_TYPE_SUBMIT_BLOCK); + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + while (1) { + SSDataBlock* output; + uint64_t ts; + if (qExecTask(exec, &output, &ts) < 0) { + ASSERT(false); + } + if (output == NULL) { + break; + } + taosArrayPush(pRes, output); + } + if (pTask->pipeSink) { + // write back + /*printf("reach end\n");*/ + tqDebugShowSSData(pRes); + } else { + int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead)); + tEncodeDataBlocks(abuf, pRes); + tmsg_t type; + + if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) { + type = TDMT_VND_TASK_EXEC; + } else { + type = TDMT_SND_TASK_EXEC; + } + + SRpcMsg reqMsg = { + .pCont = buf, + .contLen = tlen, + .code = 0, + .msgType = type, + }; + tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &reqMsg); + } + } + return 0; +} + int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { SStreamTaskExecReq* pReq = msg->pCont; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 0c4b933c198bf47aa9e8590afb186d66b28c2fc8..8c58b5ce07dea7968c2f646c416359d58d2924ea 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -126,6 +126,36 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { if (pArray == NULL) { return NULL; } + int32_t colMeta = 0; + int32_t colNeed = 0; + while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) { + SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta]; + int16_t colIdSchema = pColSchema->colId; + int16_t colIdNeed = *(int16_t*)taosArrayGet(pHandle->pColIdList, colNeed); + if (colIdSchema < colIdNeed) { + colMeta++; + } else if (colIdSchema > colIdNeed) { + colNeed++; + } else { + SColumnInfoData colInfo = {0}; + int sz = numOfRows * pColSchema->bytes; + colInfo.info.bytes = pColSchema->bytes; + colInfo.info.colId = pColSchema->colId; + colInfo.info.type = pColSchema->type; + + colInfo.pData = calloc(1, sz); + if (colInfo.pData == NULL) { + // TODO free + taosArrayDestroy(pArray); + return NULL; + } + + blockDataEnsureColumnCapacity(&colInfo, numOfRows); + taosArrayPush(pArray, &colInfo); + colMeta++; + colNeed++; + } + } int j = 0; for (int32_t i = 0; i < colNumNeed; i++) { @@ -163,11 +193,23 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { tdSTSRowIterReset(&iter, row); // get all wanted col of that block + int32_t colTot = taosArrayGetSize(pArray); + for (int32_t i = 0; i < colTot; i++) { + SColumnInfoData* pColData = taosArrayGet(pArray, i); + SCellVal sVal = {0}; + if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { + break; + } + memcpy(POINTER_SHIFT(pColData->pData, curRow * pColData->info.bytes), sVal.val, pColData->info.bytes); + } +#if 0 for (int32_t i = 0; i < colNumNeed; i++) { SColumnInfoData* pColData = taosArrayGet(pArray, i); STColumn* pCol = schemaColAt(pTschema, i); // TODO - ASSERT(pCol->colId == pColData->info.colId); + if(pCol->colId != pColData->info.colId) { + continue; + } // void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); SCellVal sVal = {0}; if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) { @@ -176,6 +218,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { } memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes); } +#endif curRow++; } return pArray; diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index 86e670d533999baab19b332ff5617982df65a2ba..70f41179762ef808d04d1d2a4f0e48c142e61ea7 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -115,7 +115,8 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open tsdb sprintf(dir, "%s/tsdb", pVnode->path); - pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); + pVnode->pTsdb = + tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); if (pVnode->pTsdb == NULL) { // TODO: handle error return -1; @@ -131,7 +132,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open TQ sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) { // TODO: handle error return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 2f5f94b55f9756b8fc802f4782743e035114728e..a3258044c84465117bb2d676c4d7240b825d2bfb 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -68,6 +68,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TASK_EXEC: return tqProcessTaskExec(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_TRIGGER: + return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: @@ -163,7 +165,6 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { memcpy(POINTER_SHIFT(metaRsp.pSchemas, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols); } - _exit: rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp); @@ -179,7 +180,6 @@ _exit: } tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); - tFreeSTableMetaRsp(&metaRsp); if (pSW != NULL) { tfree(pSW->pSchema); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index ade9adb1e13f3a566e297421f3a6df49918f7d6d..5eeecc5d2473bd571d2c26306dac4887374dfe62 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // todo: change the interface here int64_t ver; taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); - if (tqPushMsg(pVnode->pTq, ptr, pMsg->msgType, ver) < 0) { + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) { // TODO: handle error } @@ -85,10 +85,10 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { for (int i = 0; i < reqNum; i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); - char tableFName[TSDB_TABLE_FNAME_LEN]; + char tableFName[TSDB_TABLE_FNAME_LEN]; SMsgHead *pHead = (SMsgHead *)pMsg->pCont; sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name); - + int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName); if (code) { SVCreateTbRsp rsp; @@ -96,7 +96,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); } - + if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // TODO: handle error vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); @@ -116,10 +116,10 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { taosArrayDestroy(vCreateTbBatchReq.pArray); if (vCreateTbBatchRsp.rspList) { int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); - void *msg = rpcMallocCont(contLen); + void *msg = rpcMallocCont(contLen); tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp); taosArrayDestroy(vCreateTbBatchRsp.rspList); - + *pRsp = calloc(1, sizeof(SRpcMsg)); (*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP; (*pRsp)->pCont = msg; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 19100d7560b6eb3df1519f479b827294891865b9..e89bc5df0e52940e19bfe69c3d17b46bd79edb02 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -30,7 +30,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t qError("join not supported for stream block scan, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } - + pOperator->status = OP_NOT_OPENED; return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); } else { SStreamBlockScanInfo* pInfo = pOperator->info;