未验证 提交 f65e2974 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10941 from taosdata/feature/tq

Feature/tq
...@@ -63,7 +63,7 @@ int32_t init_env() { ...@@ -63,7 +63,7 @@ int32_t init_env() {
} }
int32_t create_stream() { int32_t create_stream() {
printf("create topic\n"); printf("create stream\n");
TAOS_RES* pRes; TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
...@@ -77,7 +77,9 @@ int32_t create_stream() { ...@@ -77,7 +77,9 @@ int32_t create_stream() {
} }
taos_free_result(pRes); taos_free_result(pRes);
const char* sql = "select ts,k from tu1"; /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
const char* sql = "select min(k), max(k), sum(k) from st1";
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes = tmq_create_stream(pConn, "stream1", "out1", sql); pRes = tmq_create_stream(pConn, "stream1", "out1", sql);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes));
......
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
#include "tencode.h" #include "tencode.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "trow.h"
#include "tname.h" #include "tname.h"
#include "trow.h"
#include "tuuid.h" #include "tuuid.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -472,10 +472,9 @@ typedef struct { ...@@ -472,10 +472,9 @@ typedef struct {
int32_t code; int32_t code;
} SQueryTableRsp; } SQueryTableRsp;
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
...@@ -1428,9 +1427,8 @@ typedef struct { ...@@ -1428,9 +1427,8 @@ typedef struct {
SArray* rspList; // SArray<SVCreateTbRsp> SArray* rspList; // SArray<SVCreateTbRsp>
} SVCreateTbBatchRsp; } SVCreateTbBatchRsp;
int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
typedef struct { typedef struct {
int64_t ver; int64_t ver;
...@@ -2312,6 +2310,11 @@ enum { ...@@ -2312,6 +2310,11 @@ enum {
STREAM_TASK_STATUS__STOP, STREAM_TASK_STATUS__STOP,
}; };
enum {
STREAM_NEXT_OP_DST__VND = 1,
STREAM_NEXT_OP_DST__SND,
};
typedef struct { typedef struct {
void* inputHandle; void* inputHandle;
void* executor; void* executor;
...@@ -2326,6 +2329,7 @@ typedef struct { ...@@ -2326,6 +2329,7 @@ typedef struct {
int8_t pipeSink; int8_t pipeSink;
int8_t numOfRunners; int8_t numOfRunners;
int8_t parallelizable; int8_t parallelizable;
int8_t nextOpDst; // vnode or snode
SEpSet NextOpEp; SEpSet NextOpEp;
char* qmsg; char* qmsg;
// not applied to encoder and decoder // not applied to encoder and decoder
...@@ -2341,6 +2345,8 @@ static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) ...@@ -2341,6 +2345,8 @@ static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level)
return NULL; return NULL;
} }
pTask->taskId = tGenIdPI32(); pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
pTask->level = level;
pTask->status = STREAM_TASK_STATUS__RUNNING; pTask->status = STREAM_TASK_STATUS__RUNNING;
pTask->qmsg = NULL; pTask->qmsg = NULL;
return pTask; return pTask;
......
...@@ -191,6 +191,7 @@ enum { ...@@ -191,6 +191,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
......
...@@ -132,6 +132,9 @@ bool tsdbForceKeepFile = false; ...@@ -132,6 +132,9 @@ bool tsdbForceKeepFile = false;
int32_t tsDiskCfgNum = 0; int32_t tsDiskCfgNum = 0;
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
// stream scheduler
bool tsStreamSchedV = true;
/* /*
* minimum scale for whole system, millisecond by default * minimum scale for whole system, millisecond by default
* for TSDB_TIME_PRECISION_MILLI: 86400000L * for TSDB_TIME_PRECISION_MILLI: 86400000L
......
...@@ -313,12 +313,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -313,12 +313,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
} }
if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->delayUnit);
tlen += taosEncodeFixedI8(buf, param->nFuncIds); tlen += taosEncodeFixedI8(buf, param->nFuncIds);
for(int8_t i=0; i< param->nFuncIds; ++i) { for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
} }
tlen += taosEncodeFixedI64(buf, param->delay); tlen += taosEncodeFixedI64(buf, param->delay);
...@@ -340,12 +340,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { ...@@ -340,12 +340,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
} }
if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->delayUnit);
tlen += taosEncodeFixedI8(buf, param->nFuncIds); tlen += taosEncodeFixedI8(buf, param->nFuncIds);
for(int8_t i=0; i< param->nFuncIds; ++i) { for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
} }
tlen += taosEncodeFixedI64(buf, param->delay); tlen += taosEncodeFixedI64(buf, param->delay);
...@@ -385,7 +385,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -385,7 +385,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
} }
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
if(pReq->stbCfg.nBSmaCols > 0) { if (pReq->stbCfg.nBSmaCols > 0) {
pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i);
...@@ -393,14 +393,14 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -393,14 +393,14 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
} else { } else {
pReq->stbCfg.pBSmaCols = NULL; pReq->stbCfg.pBSmaCols = NULL;
} }
if(pReq->rollup) { if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
buf = taosDecodeFixedU32(buf, (uint32_t*)&param->xFilesFactor); buf = taosDecodeFixedU32(buf, (uint32_t *)&param->xFilesFactor);
buf = taosDecodeFixedI8(buf, &param->delayUnit); buf = taosDecodeFixedI8(buf, &param->delayUnit);
buf = taosDecodeFixedI8(buf, &param->nFuncIds); buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if(param->nFuncIds > 0) { if (param->nFuncIds > 0) {
for (int8_t i = 0; i< param->nFuncIds; ++i) { for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i); buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
} }
} else { } else {
...@@ -425,7 +425,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -425,7 +425,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
} }
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
if(pReq->stbCfg.nBSmaCols > 0) { if (pReq->stbCfg.nBSmaCols > 0) {
pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i);
...@@ -433,14 +433,14 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -433,14 +433,14 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
} else { } else {
pReq->stbCfg.pBSmaCols = NULL; pReq->stbCfg.pBSmaCols = NULL;
} }
if(pReq->rollup) { if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
buf = taosDecodeFixedU32(buf, (uint32_t*)&param->xFilesFactor); buf = taosDecodeFixedU32(buf, (uint32_t *)&param->xFilesFactor);
buf = taosDecodeFixedI8(buf, &param->delayUnit); buf = taosDecodeFixedI8(buf, &param->delayUnit);
buf = taosDecodeFixedI8(buf, &param->nFuncIds); buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if(param->nFuncIds > 0) { if (param->nFuncIds > 0) {
for (int8_t i = 0; i< param->nFuncIds; ++i) { for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i); buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
} }
} else { } else {
...@@ -2775,7 +2775,6 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc ...@@ -2775,7 +2775,6 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc
return 0; return 0;
} }
int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) {
int32_t tlen = 0; int32_t tlen = 0;
...@@ -2824,7 +2823,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -2824,7 +2823,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1; if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
...@@ -2872,30 +2872,36 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { ...@@ -2872,30 +2872,36 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
} }
int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if (tStartEncode(pEncoder) < 0) return -1; /*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->pipeSource) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1; if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1;
// if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1; // if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
tEndEncode(pEncoder); /*tEndEncode(pEncoder);*/
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tStartDecode(pDecoder) < 0) return -1; /*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->pipeSource) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1;
// if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1; // if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
tEndDecode(pDecoder); /*tEndDecode(pDecoder);*/
return 0; return 0;
} }
......
...@@ -142,6 +142,8 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { ...@@ -142,6 +142,8 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
// Requests handled by VNODE // Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
......
...@@ -277,6 +277,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { ...@@ -277,6 +277,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
......
...@@ -32,20 +32,23 @@ ...@@ -32,20 +32,23 @@
#include "tname.h" #include "tname.h"
#include "tuuid.h" #include "tuuid.h"
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type) { extern bool tsStreamSchedV;
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder; SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask); tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos; int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tCoderClear(&encoder); tCoderClear(&encoder);
void* buf = malloc(tlen); void* buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
((SMsgHead*)buf)->streamTaskId = pTask->taskId; ((SMsgHead*)buf)->streamTaskId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask); tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder); tCoderClear(&encoder);
...@@ -70,7 +73,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS ...@@ -70,7 +73,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY); mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
return 0; return 0;
} }
...@@ -90,7 +93,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, ...@@ -90,7 +93,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY); mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
return 0; return 0;
} }
...@@ -106,6 +109,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -106,6 +109,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
int32_t lastUsedVgId = 0;
for (int32_t level = 0; level < totLevel; level++) { for (int32_t level = 0; level < totLevel; level++) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
...@@ -113,9 +117,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -113,9 +117,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t opNum = LIST_LENGTH(inner->pNodeList); int32_t opNum = LIST_LENGTH(inner->pNodeList);
ASSERT(opNum == 1); ASSERT(opNum == 1);
SSubplan* plan = nodesListGetNode(inner->pNodeList, level); SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
if (level == 0) { if (level == 0) {
ASSERT(plan->type == SUBPLAN_TYPE_SCAN); ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
...@@ -125,11 +129,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -125,11 +129,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
continue; continue;
} }
lastUsedVgId = pVgroup->vgId;
pStream->vgNum++; pStream->vgNum++;
// send to vnode // send to vnode
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
pTask->pipeSource = 1;
pTask->pipeSink = level == totLevel - 1 ? 1 : 0; pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = 1;
// TODO: set to // TODO: set to
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -140,7 +147,21 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -140,7 +147,21 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} }
} else { } else {
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
pTask->pipeSource = 0;
pTask->pipeSink = level == totLevel - 1 ? 1 : 0; pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
pTask->nextOpDst = STREAM_NEXT_OP_DST__VND;
if (tsStreamSchedV) {
ASSERT(lastUsedVgId != 0);
SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId);
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) {
sdbRelease(pSdb, pVg);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pVg);
} else {
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
if (pSnode != NULL) { if (pSnode != NULL) {
if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) {
...@@ -153,6 +174,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -153,6 +174,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// TODO: assign to one vg // TODO: assign to one vg
ASSERT(0); ASSERT(0);
} }
}
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
} }
taosArrayPush(pStream->tasks, taskOneLevel); taosArrayPush(pStream->tasks, taskOneLevel);
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "mndScheduler.h" #include "mndScheduler.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
...@@ -33,6 +34,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); ...@@ -33,6 +34,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SNodeMsg *pReq); static int32_t mndProcessCreateStreamReq(SNodeMsg *pReq);
static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp);
/*static int32_t mndProcessDropStreamReq(SNodeMsg *pReq);*/ /*static int32_t mndProcessDropStreamReq(SNodeMsg *pReq);*/
/*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/ /*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/
static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq); static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq);
...@@ -50,6 +52,8 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -50,6 +52,8 @@ int32_t mndInitStream(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndStreamActionDelete}; .deleteFp = (SdbDeleteFp)mndStreamActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp);
mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp);
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
...@@ -68,7 +72,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { ...@@ -68,7 +72,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
SCoder encoder; SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
if (tEncodeSStreamObj(NULL, pStream) < 0) { if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder); tCoderClear(&encoder);
goto STREAM_ENCODE_OVER; goto STREAM_ENCODE_OVER;
} }
...@@ -83,7 +87,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { ...@@ -83,7 +87,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
if (buf == NULL) goto STREAM_ENCODE_OVER; if (buf == NULL) goto STREAM_ENCODE_OVER;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
if (tEncodeSStreamObj(NULL, pStream) < 0) { if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder); tCoderClear(&encoder);
goto STREAM_ENCODE_OVER; goto STREAM_ENCODE_OVER;
} }
...@@ -135,7 +139,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { ...@@ -135,7 +139,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SCoder decoder; SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, NULL, 0, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, tlen + 1, TD_DECODER);
if (tDecodeSStreamObj(&decoder, pStream) < 0) { if (tDecodeSStreamObj(&decoder, pStream) < 0) {
goto STREAM_DECODE_OVER; goto STREAM_DECODE_OVER;
} }
...@@ -191,6 +195,11 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { ...@@ -191,6 +195,11 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }
static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) { static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) {
SName name = {0}; SName name = {0};
tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -209,6 +218,33 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { ...@@ -209,6 +218,33 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0; return 0;
} }
static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) {
if (NULL == pCreate->ast) {
return TSDB_CODE_SUCCESS;
}
SNode *pAst = NULL;
int32_t code = nodesStringToNode(pCreate->ast, &pAst);
SQueryPlan *pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
};
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pPlan, false, pStr, NULL);
}
nodesDestroyNode(pAst);
nodesDestroyNode(pPlan);
terrno = code;
return code;
}
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
mDebug("stream:%s to create", pCreate->name); mDebug("stream:%s to create", pCreate->name);
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
...@@ -220,8 +256,13 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -220,8 +256,13 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.physicalPlan = ""; /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = ""; streamObj.logicalPlan = "not implemented";
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
......
...@@ -236,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { ...@@ -236,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
return 0; return 0;
} }
static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) { static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
if (NULL == pCreate->ast) { if (NULL == pCreate->ast) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -167,6 +167,7 @@ struct STQ { ...@@ -167,6 +167,7 @@ struct STQ {
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
STqPushMgr* tqPushMgr; STqPushMgr* tqPushMgr;
SHashObj* pStreamTasks; SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal; SWal* pWal;
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
}; };
......
...@@ -82,12 +82,6 @@ struct SVnode { ...@@ -82,12 +82,6 @@ struct SVnode {
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq);
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp);
#define vFatal(...) \ #define vFatal(...) \
do { \ do { \
if (vDebugFlag & DEBUG_FATAL) { \ if (vDebugFlag & DEBUG_FATAL) { \
...@@ -177,19 +171,20 @@ int tqInit(); ...@@ -177,19 +171,20 @@ int tqInit();
void tqCleanUp(); void tqCleanUp();
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig,
SMemAllocatorFactory* allocFac);
void tqClose(STQ*); void tqClose(STQ*);
// required by vnode // required by vnode
int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -17,11 +17,14 @@ ...@@ -17,11 +17,14 @@
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h" #include "tqMetaStore.h"
void tqDebugShowSSData(SArray* dataBlocks);
int32_t tqInit() { return tqPushMgrInit(); } int32_t tqInit() { return tqPushMgrInit(); }
void tqCleanUp() { tqPushMgrCleanUp(); } void tqCleanUp() { tqPushMgrCleanUp(); }
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig,
SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
...@@ -29,6 +32,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S ...@@ -29,6 +32,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S
} }
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
pTq->pVnodeMeta = pVnodeMeta; pTq->pVnodeMeta = pVnodeMeta;
#if 0 #if 0
...@@ -68,46 +72,19 @@ void tqClose(STQ* pTq) { ...@@ -68,46 +72,19 @@ void tqClose(STQ* pTq) {
// TODO // TODO
} }
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) {
if (msgType != TDMT_VND_SUBMIT) return 0; if (msgType != TDMT_VND_SUBMIT) return 0;
void* data = malloc(msgLen);
void* pIter = NULL; if (data == NULL) {
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (!pTask->pipeSource) continue;
int32_t workerId = 0;
void* exec = pTask->runner[workerId].executor;
qSetStreamInput(exec, msg, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* output;
uint64_t ts;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) {
break;
}
taosArrayPush(pRes, output);
}
if (pTask->pipeSink) {
// write back
} else {
int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1; return -1;
} }
void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead)); memcpy(data, msg, msgLen);
tEncodeDataBlocks(abuf, pRes); SRpcMsg req = {
// serialize .msgType = TDMT_VND_STREAM_TRIGGER,
// to next level .pCont = data,
} .contLen = msgLen,
} };
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
#if 0 #if 0
void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL); void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
...@@ -483,7 +460,9 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -483,7 +460,9 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
} }
SCoder decoder; SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
tDecodeSStreamTask(&decoder, pTask); if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
}
tCoderClear(&decoder); tCoderClear(&decoder);
tqExpandTask(pTq, pTask, 8); tqExpandTask(pTq, pTask, 8);
...@@ -560,7 +539,11 @@ void tqDebugShowSSData(SArray* dataBlocks) { ...@@ -560,7 +539,11 @@ void tqDebugShowSSData(SArray* dataBlocks) {
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
printf(" %15u |", *(uint32_t*)var); printf(" %15d |", *(int32_t*)var);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15ld |", *(int64_t*)var);
break; break;
} }
} }
...@@ -569,6 +552,62 @@ void tqDebugShowSSData(SArray* dataBlocks) { ...@@ -569,6 +552,62 @@ void tqDebugShowSSData(SArray* dataBlocks) {
} }
} }
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (!pTask->pipeSource) continue;
int32_t workerId = 0;
void* exec = pTask->runner[workerId].executor;
qSetStreamInput(exec, data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* output;
uint64_t ts;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) {
break;
}
taosArrayPush(pRes, output);
}
if (pTask->pipeSink) {
// write back
/*printf("reach end\n");*/
tqDebugShowSSData(pRes);
} else {
int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead));
tEncodeDataBlocks(abuf, pRes);
tmsg_t type;
if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) {
type = TDMT_VND_TASK_EXEC;
} else {
type = TDMT_SND_TASK_EXEC;
}
SRpcMsg reqMsg = {
.pCont = buf,
.contLen = tlen,
.code = 0,
.msgType = type,
};
tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &reqMsg);
}
}
return 0;
}
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
SStreamTaskExecReq* pReq = msg->pCont; SStreamTaskExecReq* pReq = msg->pCont;
......
...@@ -126,6 +126,36 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -126,6 +126,36 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
if (pArray == NULL) { if (pArray == NULL) {
return NULL; return NULL;
} }
int32_t colMeta = 0;
int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
int16_t colIdSchema = pColSchema->colId;
int16_t colIdNeed = *(int16_t*)taosArrayGet(pHandle->pColIdList, colNeed);
if (colIdSchema < colIdNeed) {
colMeta++;
} else if (colIdSchema > colIdNeed) {
colNeed++;
} else {
SColumnInfoData colInfo = {0};
int sz = numOfRows * pColSchema->bytes;
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type;
colInfo.pData = calloc(1, sz);
if (colInfo.pData == NULL) {
// TODO free
taosArrayDestroy(pArray);
return NULL;
}
blockDataEnsureColumnCapacity(&colInfo, numOfRows);
taosArrayPush(pArray, &colInfo);
colMeta++;
colNeed++;
}
}
int j = 0; int j = 0;
for (int32_t i = 0; i < colNumNeed; i++) { for (int32_t i = 0; i < colNumNeed; i++) {
...@@ -163,11 +193,23 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -163,11 +193,23 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
// get all wanted col of that block // get all wanted col of that block
int32_t colTot = taosArrayGetSize(pArray);
for (int32_t i = 0; i < colTot; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break;
}
memcpy(POINTER_SHIFT(pColData->pData, curRow * pColData->info.bytes), sVal.val, pColData->info.bytes);
}
#if 0
for (int32_t i = 0; i < colNumNeed; i++) { for (int32_t i = 0; i < colNumNeed; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i); SColumnInfoData* pColData = taosArrayGet(pArray, i);
STColumn* pCol = schemaColAt(pTschema, i); STColumn* pCol = schemaColAt(pTschema, i);
// TODO // TODO
ASSERT(pCol->colId == pColData->info.colId); if(pCol->colId != pColData->info.colId) {
continue;
}
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); // void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
SCellVal sVal = {0}; SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) { if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) {
...@@ -176,6 +218,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -176,6 +218,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
} }
memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes); memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes);
} }
#endif
curRow++; curRow++;
} }
return pArray; return pArray;
......
...@@ -115,7 +115,8 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -115,7 +115,8 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb // Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path); sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); pVnode->pTsdb =
tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
if (pVnode->pTsdb == NULL) { if (pVnode->pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
...@@ -131,7 +132,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -131,7 +132,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open TQ // Open TQ
sprintf(dir, "%s/tq", pVnode->path); sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
......
...@@ -68,6 +68,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -68,6 +68,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC: case TDMT_VND_TASK_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg); return tqProcessTaskExec(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default: default:
...@@ -163,7 +165,6 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -163,7 +165,6 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
memcpy(POINTER_SHIFT(metaRsp.pSchemas, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols); memcpy(POINTER_SHIFT(metaRsp.pSchemas, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols);
} }
_exit: _exit:
rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp); rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
...@@ -179,7 +180,6 @@ _exit: ...@@ -179,7 +180,6 @@ _exit:
} }
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
tFreeSTableMetaRsp(&metaRsp); tFreeSTableMetaRsp(&metaRsp);
if (pSW != NULL) { if (pSW != NULL) {
tfree(pSW->pSchema); tfree(pSW->pSchema);
......
...@@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// todo: change the interface here // todo: change the interface here
int64_t ver; int64_t ver;
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
if (tqPushMsg(pVnode->pTq, ptr, pMsg->msgType, ver) < 0) { if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
// TODO: handle error // TODO: handle error
} }
......
...@@ -30,7 +30,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -30,7 +30,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
qError("join not supported for stream block scan, %s" PRIx64, id); qError("join not supported for stream block scan, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); return doSetStreamBlock(pOperator->pDownstream[0], input, type, id);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册