提交 18f254a0 编写于 作者: L Liu Jicong

task deploy and task exec

上级 da65b721
...@@ -58,7 +58,10 @@ typedef struct SDataBlockInfo { ...@@ -58,7 +58,10 @@ typedef struct SDataBlockInfo {
int32_t rows; int32_t rows;
int32_t rowSize; int32_t rowSize;
int32_t numOfCols; int32_t numOfCols;
union {int64_t uid; int64_t blockId;}; union {
int64_t uid;
int64_t blockId;
};
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SConstantItem { typedef struct SConstantItem {
...@@ -70,10 +73,10 @@ typedef struct SConstantItem { ...@@ -70,10 +73,10 @@ typedef struct SConstantItem {
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg; SColumnDataAgg* pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData> SArray* pDataBlock; // SArray<SColumnInfoData>
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value. SArray* pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info; SDataBlockInfo info;
} SSDataBlock; } SSDataBlock;
typedef struct SVarColAttr { typedef struct SVarColAttr {
...@@ -244,7 +247,7 @@ typedef struct SGroupbyExpr { ...@@ -244,7 +247,7 @@ typedef struct SGroupbyExpr {
typedef struct SFunctParam { typedef struct SFunctParam {
int32_t type; int32_t type;
SColumn *pCol; SColumn* pCol;
SVariant param; SVariant param;
} SFunctParam; } SFunctParam;
...@@ -262,12 +265,12 @@ typedef struct SResSchame { ...@@ -262,12 +265,12 @@ typedef struct SResSchame {
typedef struct SExprBasicInfo { typedef struct SExprBasicInfo {
SResSchema resSchema; SResSchema resSchema;
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
SFunctParam *pParam; SFunctParam* pParam;
} SExprBasicInfo; } SExprBasicInfo;
typedef struct SExprInfo { typedef struct SExprInfo {
struct SExprBasicInfo base; struct SExprBasicInfo base;
struct tExprNode *pExpr; struct tExprNode* pExpr;
} SExprInfo; } SExprInfo;
typedef struct SStateWindow { typedef struct SStateWindow {
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "trow.h" #include "trow.h"
#include "tuuid.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -171,7 +172,7 @@ typedef struct { ...@@ -171,7 +172,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t dbId; int64_t dbId;
int32_t vgVersion; int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SBuildUseDBInput; } SBuildUseDBInput;
typedef struct SField { typedef struct SField {
...@@ -427,10 +428,10 @@ typedef struct { ...@@ -427,10 +428,10 @@ typedef struct {
int16_t slotId; int16_t slotId;
}; };
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;
uint8_t precision; uint8_t precision;
uint8_t scale; uint8_t scale;
} SColumnInfo; } SColumnInfo;
typedef struct { typedef struct {
...@@ -526,7 +527,7 @@ typedef struct { ...@@ -526,7 +527,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t dbId; int64_t dbId;
int32_t vgVersion; int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SUseDbReq; } SUseDbReq;
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
...@@ -553,15 +554,13 @@ int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); ...@@ -553,15 +554,13 @@ int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
typedef struct { typedef struct {
SArray *epSetList; // SArray<SEpSet> SArray* epSetList; // SArray<SEpSet>
} SQnodeListRsp; } SQnodeListRsp;
int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp); void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
typedef struct { typedef struct {
SArray* pArray; // Array of SUseDbRsp SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp; } SUseDbBatchRsp;
...@@ -777,7 +776,6 @@ typedef struct SVgroupInfo { ...@@ -777,7 +776,6 @@ typedef struct SVgroupInfo {
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SVgroupInfo; } SVgroupInfo;
typedef struct { typedef struct {
int32_t numOfVgroups; int32_t numOfVgroups;
SVgroupInfo vgroups[]; SVgroupInfo vgroups[];
...@@ -1062,8 +1060,8 @@ typedef struct { ...@@ -1062,8 +1060,8 @@ typedef struct {
} STaskStatus; } STaskStatus;
typedef struct { typedef struct {
int64_t refId; int64_t refId;
SArray *taskStatus; //SArray<STaskStatus> SArray* taskStatus; // SArray<STaskStatus>
} SSchedulerStatusRsp; } SSchedulerStatusRsp;
typedef struct { typedef struct {
...@@ -1072,35 +1070,31 @@ typedef struct { ...@@ -1072,35 +1070,31 @@ typedef struct {
int8_t action; int8_t action;
} STaskAction; } STaskAction;
typedef struct SQueryNodeEpId { typedef struct SQueryNodeEpId {
int32_t nodeId; // vgId or qnodeId int32_t nodeId; // vgId or qnodeId
SEp ep; SEp ep;
} SQueryNodeEpId; } SQueryNodeEpId;
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
uint64_t sId; uint64_t sId;
SQueryNodeEpId epId; SQueryNodeEpId epId;
SArray *taskAction; //SArray<STaskAction> SArray* taskAction; // SArray<STaskAction>
} SSchedulerHbReq; } SSchedulerHbReq;
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq); int32_t tSerializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq);
int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq); int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq);
void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq); void tFreeSSchedulerHbReq(SSchedulerHbReq* pReq);
typedef struct { typedef struct {
uint64_t seqId; uint64_t seqId;
SQueryNodeEpId epId; SQueryNodeEpId epId;
SArray *taskStatus; //SArray<STaskStatus> SArray* taskStatus; // SArray<STaskStatus>
} SSchedulerHbRsp; } SSchedulerHbRsp;
int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp); int32_t tSerializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp);
int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp); int32_t tDeserializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp);
void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp); void tFreeSSchedulerHbRsp(SSchedulerHbRsp* pRsp);
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
...@@ -1370,7 +1364,7 @@ typedef struct SVCreateTbReq { ...@@ -1370,7 +1364,7 @@ typedef struct SVCreateTbReq {
} SVCreateTbReq, SVUpdateTbReq; } SVCreateTbReq, SVUpdateTbReq;
typedef struct { typedef struct {
int tmp; // TODO: to avoid compile error int tmp; // TODO: to avoid compile error
} SVCreateTbRsp, SVUpdateTbRsp; } SVCreateTbRsp, SVUpdateTbRsp;
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
...@@ -1382,7 +1376,7 @@ typedef struct { ...@@ -1382,7 +1376,7 @@ typedef struct {
} SVCreateTbBatchReq; } SVCreateTbBatchReq;
typedef struct { typedef struct {
int tmp; // TODO: to avoid compile error int tmp; // TODO: to avoid compile error
} SVCreateTbBatchRsp; } SVCreateTbBatchRsp;
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
...@@ -1396,7 +1390,7 @@ typedef struct { ...@@ -1396,7 +1390,7 @@ typedef struct {
} SVDropTbReq; } SVDropTbReq;
typedef struct { typedef struct {
int tmp; // TODO: to avoid compile error int tmp; // TODO: to avoid compile error
} SVDropTbRsp; } SVDropTbRsp;
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq); int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
...@@ -1933,7 +1927,7 @@ typedef struct { ...@@ -1933,7 +1927,7 @@ typedef struct {
} SVCreateTSmaReq; } SVCreateTSmaReq;
typedef struct { typedef struct {
int8_t type; // 0 status report, 1 update data int8_t type; // 0 status report, 1 update data
char indexName[TSDB_INDEX_NAME_LEN]; // char indexName[TSDB_INDEX_NAME_LEN]; //
STimeWindow windows; STimeWindow windows;
} STSmaMsg; } STSmaMsg;
...@@ -1944,7 +1938,7 @@ typedef struct { ...@@ -1944,7 +1938,7 @@ typedef struct {
} SVDropTSmaReq; } SVDropTSmaReq;
typedef struct { typedef struct {
int tmp; // TODO: to avoid compile error int tmp; // TODO: to avoid compile error
} SVCreateTSmaRsp, SVDropTSmaRsp; } SVCreateTSmaRsp, SVDropTSmaRsp;
int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq); int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq);
...@@ -2029,7 +2023,7 @@ static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { ...@@ -2029,7 +2023,7 @@ static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
tlen += taosEncodeFixedI64(buf, pSma->tableUid); tlen += taosEncodeFixedI64(buf, pSma->tableUid);
tlen += taosEncodeFixedI64(buf, pSma->interval); tlen += taosEncodeFixedI64(buf, pSma->interval);
tlen += taosEncodeFixedI64(buf, pSma->sliding); tlen += taosEncodeFixedI64(buf, pSma->sliding);
if (pSma->exprLen > 0) { if (pSma->exprLen > 0) {
tlen += taosEncodeString(buf, pSma->expr); tlen += taosEncodeString(buf, pSma->expr);
} }
...@@ -2064,7 +2058,6 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { ...@@ -2064,7 +2058,6 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf = taosDecodeFixedI64(buf, &pSma->interval); buf = taosDecodeFixedI64(buf, &pSma->interval);
buf = taosDecodeFixedI64(buf, &pSma->sliding); buf = taosDecodeFixedI64(buf, &pSma->sliding);
if (pSma->exprLen > 0) { if (pSma->exprLen > 0) {
pSma->expr = (char*)calloc(pSma->exprLen, 1); pSma->expr = (char*)calloc(pSma->exprLen, 1);
if (pSma->expr != NULL) { if (pSma->expr != NULL) {
...@@ -2265,6 +2258,51 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p ...@@ -2265,6 +2258,51 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
return buf; return buf;
} }
enum {
STREAM_TASK_STATUS__RUNNING = 1,
STREAM_TASK_STATUS__STOP,
};
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t level;
int8_t status;
char* qmsg;
void* executor;
// void* stateStore;
// storage handle
} SStreamTask;
static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) {
SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return NULL;
}
pTask->taskId = tGenIdPI32();
pTask->status = STREAM_TASK_STATUS__RUNNING;
pTask->qmsg = NULL;
return pTask;
}
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
typedef struct {
SMsgHead head;
SStreamTask* task;
} SStreamTaskDeployReq;
typedef struct {
int32_t reserved;
} SStreamTaskDeployRsp;
typedef struct {
SMsgHead head;
// TODO: other info needed by task
} SStreamTaskExecReq;
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -199,6 +199,7 @@ enum { ...@@ -199,6 +199,7 @@ enum {
// Requests handled by SNODE // Requests handled by SNODE
TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_NEW_MSG_SEG(TDMT_SND_MSG)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
#if defined(TD_MSG_NUMBER_) #if defined(TD_MSG_NUMBER_)
TDMT_MAX TDMT_MAX
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef _TD_SNODE_H_ #ifndef _TD_SNODE_H_
#define _TD_SNODE_H_ #define _TD_SNODE_H_
#include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
...@@ -78,7 +79,7 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); ...@@ -78,7 +79,7 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
* @param pRsp The response message * @param pRsp The response message
* @return int32_t 0 for success, -1 for failure * @return int32_t 0 for success, -1 for failure
*/ */
int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); // int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
......
...@@ -1467,8 +1467,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { ...@@ -1467,8 +1467,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
return 0; return 0;
} }
int32_t tSerializeSQnodeListReq(void *buf, int32_t bufLen, SQnodeListReq *pReq) {
int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
...@@ -1499,7 +1498,7 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) ...@@ -1499,7 +1498,7 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp)
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->epSetList); int32_t num = taosArrayGetSize(pRsp->epSetList);
if (tEncodeI32(&encoder, num) < 0) return -1; if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SEpSet *epSet = taosArrayGet(pRsp->epSetList, i); SEpSet *epSet = taosArrayGet(pRsp->epSetList, i);
if (tEncodeSEpSet(&encoder, epSet) < 0) return -1; if (tEncodeSEpSet(&encoder, epSet) < 0) return -1;
...@@ -2491,27 +2490,27 @@ int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pR ...@@ -2491,27 +2490,27 @@ int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pR
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->epId.nodeId) < 0) return -1; if (tEncodeI32(&encoder, pReq->epId.nodeId) < 0) return -1;
if (tEncodeU16(&encoder, pReq->epId.ep.port) < 0) return -1; if (tEncodeU16(&encoder, pReq->epId.ep.port) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->epId.ep.fqdn) < 0) return -1; if (tEncodeCStr(&encoder, pReq->epId.ep.fqdn) < 0) return -1;
if (pReq->taskAction) { if (pReq->taskAction) {
int32_t num = taosArrayGetSize(pReq->taskAction); int32_t num = taosArrayGetSize(pReq->taskAction);
if (tEncodeI32(&encoder, num) < 0) return -1; if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
STaskAction *action = taosArrayGet(pReq->taskAction, i); STaskAction *action = taosArrayGet(pReq->taskAction, i);
if (tEncodeU64(&encoder, action->queryId) < 0) return -1; if (tEncodeU64(&encoder, action->queryId) < 0) return -1;
if (tEncodeU64(&encoder, action->taskId) < 0) return -1; if (tEncodeU64(&encoder, action->taskId) < 0) return -1;
if (tEncodeI8(&encoder, action->action) < 0) return -1; if (tEncodeI8(&encoder, action->action) < 0) return -1;
} }
} else { } else {
if (tEncodeI32(&encoder, 0) < 0) return -1; if (tEncodeI32(&encoder, 0) < 0) return -1;
} }
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tCoderClear(&encoder);
if (buf != NULL) { if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId); pHead->vgId = htonl(pReq->header.vgId);
...@@ -2559,29 +2558,27 @@ int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq * ...@@ -2559,29 +2558,27 @@ int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *
void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq) { taosArrayDestroy(pReq->taskAction); } void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq) { taosArrayDestroy(pReq->taskAction); }
int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp) { int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1; if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1; if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1;
if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1; if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1;
if (pRsp->taskStatus) { if (pRsp->taskStatus) {
int32_t num = taosArrayGetSize(pRsp->taskStatus); int32_t num = taosArrayGetSize(pRsp->taskStatus);
if (tEncodeI32(&encoder, num) < 0) return -1; if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
STaskStatus *status = taosArrayGet(pRsp->taskStatus, i); STaskStatus *status = taosArrayGet(pRsp->taskStatus, i);
if (tEncodeU64(&encoder, status->queryId) < 0) return -1; if (tEncodeU64(&encoder, status->queryId) < 0) return -1;
if (tEncodeU64(&encoder, status->taskId) < 0) return -1; if (tEncodeU64(&encoder, status->taskId) < 0) return -1;
if (tEncodeI64(&encoder, status->refId) < 0) return -1; if (tEncodeI64(&encoder, status->refId) < 0) return -1;
if (tEncodeI8(&encoder, status->status) < 0) return -1; if (tEncodeI8(&encoder, status->status) < 0) return -1;
} }
} else { } else {
if (tEncodeI32(&encoder, 0) < 0) return -1; if (tEncodeI32(&encoder, 0) < 0) return -1;
} }
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2694,3 +2691,32 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { ...@@ -2694,3 +2691,32 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
tfree(pReq->physicalPlan); tfree(pReq->physicalPlan);
tfree(pReq->logicalPlan); tfree(pReq->logicalPlan);
} }
int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeCStr(pDecoder, (const char **)&pTask->qmsg) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
void tFreeSStreamTask(SStreamTask *pTask) {
// TODO
/*free(pTask->qmsg);*/
/*free(pTask->executor);*/
/*free(pTask);*/
}
...@@ -323,8 +323,8 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -323,8 +323,8 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
SSnodeMgmt *pMgmt = &pDnode->smgmt; /*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode); SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) { if (pSnode != NULL) {
...@@ -334,22 +334,35 @@ static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t ...@@ -334,22 +334,35 @@ static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t
sndProcessUMsg(pSnode, pMsg); sndProcessUMsg(pSnode, pMsg);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
dndReleaseSnode(pDnode, pSnode);
} else {
for (int32_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
} }
dndReleaseSnode(pDnode, pSnode);
} }
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
SSnodeMgmt *pMgmt = &pDnode->smgmt; /*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode); SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) { if (pSnode != NULL) {
code = sndProcessSMsg(pSnode, pMsg); sndProcessSMsg(pSnode, pMsg);
dndReleaseSnode(pDnode, pSnode);
} else {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
} }
dndReleaseSnode(pDnode, pSnode);
#if 0 #if 0
if (pMsg->msgType & 1u) { if (pMsg->msgType & 1u) {
......
...@@ -85,6 +85,8 @@ typedef enum { ...@@ -85,6 +85,8 @@ typedef enum {
TRN_TYPE_REBALANCE = 1017, TRN_TYPE_REBALANCE = 1017,
TRN_TYPE_COMMIT_OFFSET = 1018, TRN_TYPE_COMMIT_OFFSET = 1018,
TRN_TYPE_CREATE_STREAM = 1019, TRN_TYPE_CREATE_STREAM = 1019,
TRN_TYPE_DROP_STREAM = 1020,
TRN_TYPE_ALTER_STREAM = 1021,
TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001, TRN_TYPE_CREATE_DNODE = 2001,
...@@ -679,12 +681,6 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons ...@@ -679,12 +681,6 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
return buf; return buf;
} }
typedef struct {
int32_t taskId;
int32_t level;
SSubplan* plan;
} SStreamTaskMeta;
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
...@@ -700,7 +696,7 @@ typedef struct { ...@@ -700,7 +696,7 @@ typedef struct {
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTaskMeta>> SArray* tasks; // SArray<SArray<SStreamTask>>
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
......
...@@ -27,6 +27,8 @@ void mndCleanupScheduler(SMnode* pMnode); ...@@ -27,6 +27,8 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
#include "tname.h" #include "tname.h"
#include "tuuid.h" #include "tuuid.h"
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL; SVgObj* pVgroup = NULL;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
...@@ -41,17 +41,18 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -41,17 +41,18 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
} }
ASSERT(pStream->vgNum == 0); ASSERT(pStream->vgNum == 0);
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(levelNum, sizeof(SArray)); pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
for (int32_t i = 0; i < levelNum; i++) { int32_t msgLen;
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTaskMeta)); for (int32_t level = 0; level < totLevel; level++) {
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, i); SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
int32_t opNum = LIST_LENGTH(inner->pNodeList); int32_t opNum = LIST_LENGTH(inner->pNodeList);
ASSERT(opNum == 1); ASSERT(opNum == 1);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); SSubplan* plan = nodesListGetNode(inner->pNodeList, level);
if (i == 0) { if (level == 0) {
ASSERT(plan->type == SUBPLAN_TYPE_SCAN); ASSERT(plan->type == SUBPLAN_TYPE_SCAN);
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
...@@ -63,15 +64,19 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -63,15 +64,19 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
} }
pStream->vgNum++; pStream->vgNum++;
// send to vnode
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
plan->execNode.nodeId = pVgroup->vgId; plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
SStreamTaskMeta task = { if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
.taskId = tGenIdPI32(), sdbRelease(pSdb, pVgroup);
.level = i, qDestroyQueryPlan(pPlan);
.plan = plan, terrno = TSDB_CODE_QRY_INVALID_INPUT;
}; return -1;
// send to vnode }
taosArrayPush(taskOneLevel, &task); taosArrayPush(taskOneLevel, pTask);
} }
} else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) {
// duplicatable // duplicatable
...@@ -82,22 +87,36 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -82,22 +87,36 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
// if has snode, set to shared thread num in snode // if has snode, set to shared thread num in snode
parallel = SND_SHARED_THREAD_NUM; parallel = SND_SHARED_THREAD_NUM;
for (int32_t j = 0; j < parallel; j++) { for (int32_t i = 0; i < parallel; i++) {
SStreamTaskMeta task = { SStreamTask* pTask = streamTaskNew(pStream->uid, level);
.taskId = tGenIdPI32(),
.level = i, // TODO:get snode id and ep
.plan = plan, plan->execNode.nodeId = pVgroup->vgId;
}; plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
taosArrayPush(taskOneLevel, &task);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
taosArrayPush(taskOneLevel, pTask);
} }
} else { } else {
// not duplicatable // not duplicatable
SStreamTaskMeta task = { SStreamTask* pTask = streamTaskNew(pStream->uid, level);
.taskId = tGenIdPI32(),
.level = i, // TODO:get snode id and ep
.plan = plan, plan->execNode.nodeId = pVgroup->vgId;
}; plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
taosArrayPush(taskOneLevel, &task);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
taosArrayPush(taskOneLevel, pTask);
} }
taosArrayPush(pStream->tasks, taskOneLevel); taosArrayPush(pStream->tasks, taskOneLevel);
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndScheduler.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTrans.h" #include "mndTrans.h"
...@@ -237,6 +238,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR ...@@ -237,6 +238,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr());
mndTransDrop(pTrans);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
......
...@@ -7,8 +7,9 @@ target_include_directories( ...@@ -7,8 +7,9 @@ target_include_directories(
) )
target_link_libraries( target_link_libraries(
snode snode
PRIVATE executor
PRIVATE transport PRIVATE transport
PRIVATE os PRIVATE os
PRIVATE common PRIVATE common
PRIVATE util PRIVATE util
) )
\ No newline at end of file
...@@ -38,13 +38,8 @@ enum { ...@@ -38,13 +38,8 @@ enum {
STREAM_STATUS__DELETING, STREAM_STATUS__DELETING,
}; };
enum {
STREAM_TASK_STATUS__RUNNING = 1,
STREAM_TASK_STATUS__STOP,
};
typedef struct { typedef struct {
SHashObj* pHash; // taskId -> streamTask SHashObj* pHash; // taskId -> SStreamTask
} SStreamMeta; } SStreamMeta;
typedef struct SSnode { typedef struct SSnode {
...@@ -52,26 +47,16 @@ typedef struct SSnode { ...@@ -52,26 +47,16 @@ typedef struct SSnode {
SSnodeOpt cfg; SSnodeOpt cfg;
} SSnode; } SSnode;
typedef struct { SStreamMeta* sndMetaNew();
int64_t streamId; void sndMetaDelete(SStreamMeta* pMeta);
int32_t taskId;
int32_t IdxInLevel;
int32_t level;
} SStreamTaskInfo;
typedef struct { int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask);
SStreamTaskInfo meta; int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int8_t status;
void* executor;
void* stateStore;
// storage handle
} SStreamTask;
int32_t sndCreateTask(); int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
int32_t sndDropTaskOfStream(int64_t streamId);
int32_t sndStopTaskOfStream(int64_t streamId); int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
int32_t sndResumeTaskOfStream(int64_t streamId); int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -13,40 +13,91 @@ ...@@ -13,40 +13,91 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "executor.h"
#include "sndInt.h" #include "sndInt.h"
#include "tuuid.h" #include "tuuid.h"
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
SSnode *pSnode = calloc(1, sizeof(SSnode)); SSnode *pSnode = calloc(1, sizeof(SSnode));
if (pSnode == NULL) {
return NULL;
}
memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt)); memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt));
pSnode->pMeta = sndMetaNew();
if (pSnode->pMeta == NULL) {
free(pSnode);
return NULL;
}
return pSnode; return pSnode;
} }
void sndClose(SSnode *pSnode) { free(pSnode); } void sndClose(SSnode *pSnode) {
sndMetaDelete(pSnode->pMeta);
free(pSnode);
}
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { /*int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {*/
*pRsp = NULL; /**pRsp = NULL;*/
return 0; /*return 0;*/
} /*}*/
void sndDestroy(const char *path) {} void sndDestroy(const char *path) {}
static int32_t sndDeployTask(SSnode *pSnode, SRpcMsg *pMsg) { SStreamMeta *sndMetaNew() {
SStreamTask *task = malloc(sizeof(SStreamTask)); SStreamMeta *pMeta = calloc(1, sizeof(SStreamMeta));
if (task == NULL) { if (pMeta == NULL) {
return NULL;
}
pMeta->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMeta->pHash == NULL) {
free(pMeta);
return NULL;
}
return pMeta;
}
void sndMetaDelete(SStreamMeta *pMeta) {
taosHashCleanup(pMeta->pHash);
free(pMeta);
}
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
pTask->executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL);
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
}
int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
SStreamTask *pTask = taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
if (pTask == NULL) {
return -1; return -1;
} }
task->meta.taskId = tGenIdPI32(); free(pTask->qmsg);
taosHashPut(pSnode->pMeta->pHash, &task->meta.taskId, sizeof(int32_t), &task, sizeof(void *)); // TODO:free executor
return 0; free(pTask);
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
} }
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deployment // stream deploy
// stream stop/resume // stream stop/resume
// operator exec // operator exec
if (pMsg->msgType == TDMT_SND_TASK_DEPLOY) {
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SStreamTask *pTask = malloc(sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER);
tDecodeSStreamTask(&decoder, pTask);
tCoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask);
} else {
//
}
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册