提交 03b21623 编写于 作者: L Liu Jicong

refactor stream

上级 ddc3b100
......@@ -207,7 +207,7 @@ typedef struct {
// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
int64_t suid; // stable id
int64_t suid; // stable id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
......@@ -2358,171 +2358,6 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
}
return buf;
}
enum {
STREAM_TASK_STATUS__RUNNING = 1,
STREAM_TASK_STATUS__STOP,
};
// pipe -> fetch/pipe queue
// merge -> merge queue
// write -> write queue
enum {
TASK_SINK_MSG__SND_PIPE = 1,
TASK_SINK_MSG__SND_MERGE,
TASK_SINK_MSG__VND_PIPE,
TASK_SINK_MSG__VND_MERGE,
TASK_SINK_MSG__VND_WRITE,
};
typedef struct {
int32_t nodeId; // 0 for snode
SEpSet epSet;
} SStreamTaskEp;
typedef struct {
void* inputHandle;
void* executor;
} SStreamRunner;
typedef struct {
int8_t parallelizable;
char* qmsg;
// followings are not applicable to encoder and decoder
int8_t numOfRunners;
SStreamRunner* runners;
} STaskExec;
typedef struct {
int8_t reserved;
} STaskDispatcherInplace;
typedef struct {
int32_t nodeId;
SEpSet epSet;
} STaskDispatcherFixedEp;
typedef struct {
int8_t hashMethod;
SArray* info;
} STaskDispatcherShuffle;
typedef struct {
int8_t reserved;
// not applicable to encoder and decoder
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef struct {
int8_t reserved;
} STaskSinkSma;
typedef struct {
int8_t reserved;
} STaskSinkFetch;
typedef struct {
int8_t reserved;
} STaskSinkShow;
enum {
TASK_SOURCE__SCAN = 1,
TASK_SOURCE__SINGLE,
TASK_SOURCE__MULTI,
};
enum {
TASK_EXEC__NONE = 1,
TASK_EXEC__EXEC,
};
enum {
TASK_DISPATCH__NONE = 1,
TASK_DISPATCH__INPLACE,
TASK_DISPATCH__FIXED,
TASK_DISPATCH__SHUFFLE,
};
enum {
TASK_SINK__NONE = 1,
TASK_SINK__TABLE,
TASK_SINK__SMA,
TASK_SINK__FETCH,
TASK_SINK__SHOW,
};
typedef struct {
int64_t streamId;
int32_t taskId;
int8_t status;
int8_t sourceType;
int8_t execType;
int8_t sinkType;
int8_t dispatchType;
int16_t dispatchMsgType;
int32_t downstreamTaskId;
// source preprocess
// exec
STaskExec exec;
// local sink
union {
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
STaskSinkShow showSink;
};
// dispatch
union {
STaskDispatcherInplace inplaceDispatcher;
STaskDispatcherFixedEp fixedEpDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
};
// state storage
} SStreamTask;
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
typedef struct {
// SMsgHead head;
SStreamTask* task;
} SStreamTaskDeployReq;
typedef struct {
int32_t reserved;
} SStreamTaskDeployRsp;
typedef struct {
// SMsgHead head;
int64_t streamId;
int32_t taskId;
SArray* data; // SArray<SSDataBlock>
} SStreamTaskExecReq;
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq);
void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq);
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq);
typedef struct {
int32_t reserved;
} SStreamTaskExecRsp;
typedef struct {
// SMsgHead head;
int64_t streamId;
int64_t version;
SArray* res; // SArray<SSDataBlock>
} SStreamSinkReq;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -25,6 +25,170 @@ extern "C" {
#ifndef _TSTREAM_H_
#define _TSTREAM_H_
enum {
STREAM_TASK_STATUS__RUNNING = 1,
STREAM_TASK_STATUS__STOP,
};
// pipe -> fetch/pipe queue
// merge -> merge queue
// write -> write queue
enum {
TASK_SINK_MSG__SND_PIPE = 1,
TASK_SINK_MSG__SND_MERGE,
TASK_SINK_MSG__VND_PIPE,
TASK_SINK_MSG__VND_MERGE,
TASK_SINK_MSG__VND_WRITE,
};
typedef struct {
int32_t nodeId; // 0 for snode
SEpSet epSet;
} SStreamTaskEp;
typedef struct {
void* inputHandle;
void* executor;
} SStreamRunner;
typedef struct {
int8_t parallelizable;
char* qmsg;
// followings are not applicable to encoder and decoder
int8_t numOfRunners;
SStreamRunner* runners;
} STaskExec;
typedef struct {
int8_t reserved;
} STaskDispatcherInplace;
typedef struct {
int32_t nodeId;
SEpSet epSet;
} STaskDispatcherFixedEp;
typedef struct {
int8_t hashMethod;
SArray* info;
} STaskDispatcherShuffle;
typedef struct {
int8_t reserved;
// not applicable to encoder and decoder
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef struct {
int8_t reserved;
} STaskSinkSma;
typedef struct {
int8_t reserved;
} STaskSinkFetch;
typedef struct {
int8_t reserved;
} STaskSinkShow;
enum {
TASK_SOURCE__SCAN = 1,
TASK_SOURCE__SINGLE,
TASK_SOURCE__MULTI,
};
enum {
TASK_EXEC__NONE = 1,
TASK_EXEC__EXEC,
};
enum {
TASK_DISPATCH__NONE = 1,
TASK_DISPATCH__INPLACE,
TASK_DISPATCH__FIXED,
TASK_DISPATCH__SHUFFLE,
};
enum {
TASK_SINK__NONE = 1,
TASK_SINK__TABLE,
TASK_SINK__SMA,
TASK_SINK__FETCH,
TASK_SINK__SHOW,
};
typedef struct {
int64_t streamId;
int32_t taskId;
int8_t status;
int8_t sourceType;
int8_t execType;
int8_t sinkType;
int8_t dispatchType;
int16_t dispatchMsgType;
int32_t downstreamTaskId;
// source preprocess
// exec
STaskExec exec;
// local sink
union {
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
STaskSinkShow showSink;
};
// dispatch
union {
STaskDispatcherInplace inplaceDispatcher;
STaskDispatcherFixedEp fixedEpDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
};
// state storage
} SStreamTask;
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
typedef struct {
// SMsgHead head;
SStreamTask* task;
} SStreamTaskDeployReq;
typedef struct {
int32_t reserved;
} SStreamTaskDeployRsp;
typedef struct {
// SMsgHead head;
int64_t streamId;
int32_t taskId;
SArray* data; // SArray<SSDataBlock>
} SStreamTaskExecReq;
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq);
void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq);
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq);
typedef struct {
int32_t reserved;
} SStreamTaskExecRsp;
typedef struct {
// SMsgHead head;
int64_t streamId;
int64_t version;
SArray* res; // SArray<SSDataBlock>
} SStreamSinkReq;
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId);
#ifdef __cplusplus
......
......@@ -3094,106 +3094,3 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
tfree(pReq->sql);
tfree(pReq->ast);
}
SStreamTask *tNewSStreamTask(int64_t streamId) {
SStreamTask *pTask = (SStreamTask *)calloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return NULL;
}
pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
pTask->status = STREAM_TASK_STATUS__RUNNING;
/*pTask->qmsg = NULL;*/
return pTask;
}
int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1;
if (pTask->execType == TASK_EXEC__EXEC) {
if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType != TASK_SINK__NONE) {
// TODO: wrap
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;
}
/*tEndEncode(pEncoder);*/
return pEncoder->pos;
}
int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1;
if (pTask->execType == TASK_EXEC__EXEC) {
if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType != TASK_SINK__NONE) {
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;
}
/*tEndDecode(pDecoder);*/
return 0;
}
void tFreeSStreamTask(SStreamTask *pTask) {
// TODO
/*free(pTask->qmsg);*/
/*free(pTask->executor);*/
/*free(pTask);*/
}
#if 0
int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
/*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/
return pEncoder->size;
}
int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) {
return pEncoder->size;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) {
taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock);
}
#endif
......@@ -6,7 +6,7 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
mnode scheduler sdb wal transport cjson sync monitor parser
mnode scheduler sdb wal transport cjson sync monitor stream parser
)
if(${BUILD_TEST})
......
......@@ -26,6 +26,7 @@
#include "tlog.h"
#include "tmsg.h"
#include "trpc.h"
#include "tstream.h"
#include "ttimer.h"
#include "mnode.h"
......
......@@ -13,4 +13,5 @@ target_link_libraries(
PRIVATE common
PRIVATE util
PRIVATE qcom
PRIVATE stream
)
......@@ -22,6 +22,7 @@
#include "tmsg.h"
#include "tqueue.h"
#include "trpc.h"
#include "tstream.h"
#include "snode.h"
......
......@@ -143,3 +143,106 @@ void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) {
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); }
SStreamTask* tNewSStreamTask(int64_t streamId) {
SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return NULL;
}
pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
pTask->status = STREAM_TASK_STATUS__RUNNING;
/*pTask->qmsg = NULL;*/
return pTask;
}
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1;
if (pTask->execType == TASK_EXEC__EXEC) {
if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType != TASK_SINK__NONE) {
// TODO: wrap
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;
}
/*tEndEncode(pEncoder);*/
return pEncoder->pos;
}
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1;
if (pTask->execType == TASK_EXEC__EXEC) {
if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType != TASK_SINK__NONE) {
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;
}
/*tEndDecode(pDecoder);*/
return 0;
}
void tFreeSStreamTask(SStreamTask* pTask) {
// TODO
/*free(pTask->qmsg);*/
/*free(pTask->executor);*/
/*free(pTask);*/
}
#if 0
int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
/*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/
return pEncoder->size;
}
int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) {
return pEncoder->size;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) {
taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock);
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册