提交 93347bec 编写于 作者: L Liu Jicong

put next level info into task

上级 997a0a93
...@@ -63,7 +63,7 @@ int32_t init_env() { ...@@ -63,7 +63,7 @@ int32_t init_env() {
} }
int32_t create_stream() { int32_t create_stream() {
printf("create topic\n"); printf("create stream\n");
TAOS_RES* pRes; TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
...@@ -77,7 +77,7 @@ int32_t create_stream() { ...@@ -77,7 +77,7 @@ int32_t create_stream() {
} }
taos_free_result(pRes); taos_free_result(pRes);
const char* sql = "select ts,k from tu1"; const char* sql = "select ts,sum(k) from tu1";
pRes = tmq_create_stream(pConn, "stream1", "out1", sql); pRes = tmq_create_stream(pConn, "stream1", "out1", sql);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes));
......
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
#include "tencode.h" #include "tencode.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "trow.h"
#include "tname.h" #include "tname.h"
#include "trow.h"
#include "tuuid.h" #include "tuuid.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -472,10 +472,9 @@ typedef struct { ...@@ -472,10 +472,9 @@ typedef struct {
int32_t code; int32_t code;
} SQueryTableRsp; } SQueryTableRsp;
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
...@@ -1408,9 +1407,8 @@ typedef struct { ...@@ -1408,9 +1407,8 @@ typedef struct {
SArray* rspList; // SArray<SVCreateTbRsp> SArray* rspList; // SArray<SVCreateTbRsp>
} SVCreateTbBatchRsp; } SVCreateTbBatchRsp;
int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
typedef struct { typedef struct {
int64_t ver; int64_t ver;
...@@ -2292,6 +2290,11 @@ enum { ...@@ -2292,6 +2290,11 @@ enum {
STREAM_TASK_STATUS__STOP, STREAM_TASK_STATUS__STOP,
}; };
enum {
STREAM_NEXT_OP_DST__VND = 1,
STREAM_NEXT_OP_DST__SND,
};
typedef struct { typedef struct {
void* inputHandle; void* inputHandle;
void* executor; void* executor;
...@@ -2306,6 +2309,7 @@ typedef struct { ...@@ -2306,6 +2309,7 @@ typedef struct {
int8_t pipeSink; int8_t pipeSink;
int8_t numOfRunners; int8_t numOfRunners;
int8_t parallelizable; int8_t parallelizable;
int8_t nextOpDst; // vnode or snode
SEpSet NextOpEp; SEpSet NextOpEp;
char* qmsg; char* qmsg;
// not applied to encoder and decoder // not applied to encoder and decoder
......
...@@ -132,6 +132,9 @@ bool tsdbForceKeepFile = false; ...@@ -132,6 +132,9 @@ bool tsdbForceKeepFile = false;
int32_t tsDiskCfgNum = 0; int32_t tsDiskCfgNum = 0;
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
// stream scheduler
bool tsStreamSchedV = true;
/* /*
* minimum scale for whole system, millisecond by default * minimum scale for whole system, millisecond by default
* for TSDB_TIME_PRECISION_MILLI: 86400000L * for TSDB_TIME_PRECISION_MILLI: 86400000L
......
...@@ -2695,7 +2695,6 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc ...@@ -2695,7 +2695,6 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc
return 0; return 0;
} }
int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) {
int32_t tlen = 0; int32_t tlen = 0;
...@@ -2797,7 +2796,10 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { ...@@ -2797,7 +2796,10 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->pipeSource) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1; if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1;
// if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1; // if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
...@@ -2811,7 +2813,10 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { ...@@ -2811,7 +2813,10 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->pipeSource) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1;
// if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1; // if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
......
...@@ -32,6 +32,8 @@ ...@@ -32,6 +32,8 @@
#include "tname.h" #include "tname.h"
#include "tuuid.h" #include "tuuid.h"
extern bool tsStreamSchedV;
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type) { int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type) {
SCoder encoder; SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
...@@ -106,6 +108,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -106,6 +108,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
int32_t lastUsedVgId = 0;
for (int32_t level = 0; level < totLevel; level++) { for (int32_t level = 0; level < totLevel; level++) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
...@@ -125,11 +128,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -125,11 +128,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
continue; continue;
} }
lastUsedVgId = pVgroup->vgId;
pStream->vgNum++; pStream->vgNum++;
// send to vnode // send to vnode
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
pTask->pipeSource = 1;
pTask->pipeSink = level == totLevel - 1 ? 1 : 0; pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = 1;
// TODO: set to // TODO: set to
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -140,7 +146,21 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -140,7 +146,21 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} }
} else { } else {
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
pTask->pipeSource = 0;
pTask->pipeSink = level == totLevel - 1 ? 1 : 0; pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = plan->type == SUBPLAN_TYPE_SCAN;
pTask->nextOpDst = STREAM_NEXT_OP_DST__VND;
if (tsStreamSchedV) {
ASSERT(lastUsedVgId != 0);
SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId);
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) {
sdbRelease(pSdb, pVg);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pVg);
} else {
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
if (pSnode != NULL) { if (pSnode != NULL) {
if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) {
...@@ -153,6 +173,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -153,6 +173,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// TODO: assign to one vg // TODO: assign to one vg
ASSERT(0); ASSERT(0);
} }
}
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
} }
taosArrayPush(pStream->tasks, taskOneLevel); taosArrayPush(pStream->tasks, taskOneLevel);
......
...@@ -167,6 +167,7 @@ struct STQ { ...@@ -167,6 +167,7 @@ struct STQ {
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
STqPushMgr* tqPushMgr; STqPushMgr* tqPushMgr;
SHashObj* pStreamTasks; SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal; SWal* pWal;
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
}; };
......
...@@ -177,7 +177,8 @@ int tqInit(); ...@@ -177,7 +177,8 @@ int tqInit();
void tqCleanUp(); void tqCleanUp();
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig,
SMemAllocatorFactory* allocFac);
void tqClose(STQ*); void tqClose(STQ*);
// required by vnode // required by vnode
...@@ -188,7 +189,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); ...@@ -188,7 +189,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -21,7 +21,8 @@ int32_t tqInit() { return tqPushMgrInit(); } ...@@ -21,7 +21,8 @@ int32_t tqInit() { return tqPushMgrInit(); }
void tqCleanUp() { tqPushMgrCleanUp(); } void tqCleanUp() { tqPushMgrCleanUp(); }
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig,
SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
...@@ -29,6 +30,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S ...@@ -29,6 +30,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S
} }
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
pTq->pVnodeMeta = pVnodeMeta; pTq->pVnodeMeta = pVnodeMeta;
#if 0 #if 0
...@@ -104,8 +106,21 @@ int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { ...@@ -104,8 +106,21 @@ int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
} }
void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead));
tEncodeDataBlocks(abuf, pRes); tEncodeDataBlocks(abuf, pRes);
// serialize tmsg_t type;
// to next level
if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) {
type = TDMT_VND_TASK_EXEC;
} else {
type = TDMT_SND_TASK_EXEC;
}
SRpcMsg msg = {
.pCont = buf,
.contLen = tlen,
.code = 0,
.msgType = type,
};
/*vnodeSendReq(pTq->pVnode, &pTask->NextOpEp, &msg);*/
} }
} }
......
...@@ -115,7 +115,8 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -115,7 +115,8 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb // Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path); sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); pVnode->pTsdb =
tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
if (pVnode->pTsdb == NULL) { if (pVnode->pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
...@@ -131,7 +132,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -131,7 +132,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open TQ // Open TQ
sprintf(dir, "%s/tq", pVnode->path); sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册