提交 59a4eec1 编写于 作者: L Liu Jicong

stream extract to module

上级 78ee5356
......@@ -25,7 +25,7 @@ int32_t init_env() {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -54,13 +54,16 @@ typedef struct SColumnDataAgg {
} SColumnDataAgg;
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
int32_t rowSize;
int16_t numOfCols;
int16_t hasVarCol;
union {int64_t uid; int64_t blockId;};
int64_t groupId; // no need to serialize
STimeWindow window;
int32_t rows;
int32_t rowSize;
int16_t numOfCols;
int16_t hasVarCol;
union {
int64_t uid;
int64_t blockId;
};
int64_t groupId; // no need to serialize
} SDataBlockInfo;
typedef struct SSDataBlock {
......@@ -92,7 +95,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
void* tDecodeDataBlocks(const void* buf, SArray* blocks);
void* tDecodeDataBlocks(const void* buf, SArray** blocks);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
......
......@@ -102,7 +102,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource,
uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
......@@ -113,7 +114,8 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
int32_t pageSize);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
......@@ -136,6 +138,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
void blockDebugShowData(SArray* dataBlocks);
#ifdef __cplusplus
}
#endif
......
......@@ -201,17 +201,9 @@ typedef struct SEp {
typedef struct {
int32_t contLen;
union {
int32_t vgId;
int32_t streamTaskId;
};
int32_t vgId;
} SMsgHead;
typedef struct {
int32_t workerType;
int32_t streamTaskId;
} SStreamExecMsgHead;
// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
......@@ -477,7 +469,8 @@ typedef struct {
int32_t tz; // query client timezone
char intervalUnit;
char slidingUnit;
char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
char
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision;
int64_t interval;
int64_t sliding;
......@@ -2366,79 +2359,136 @@ enum {
STREAM_TASK_STATUS__STOP,
};
// pipe -> fetch/pipe queue
// merge -> merge queue
// write -> write queue
enum {
STREAM_NEXT_OP_DST__VND = 1,
STREAM_NEXT_OP_DST__SND,
TASK_SINK_MSG__SND_PIPE = 1,
TASK_SINK_MSG__SND_MERGE,
TASK_SINK_MSG__VND_PIPE,
TASK_SINK_MSG__VND_MERGE,
TASK_SINK_MSG__VND_WRITE,
};
typedef struct {
int32_t nodeId; // 0 for snode
SEpSet epSet;
} SStreamTaskEp;
typedef struct {
void* inputHandle;
void* executor;
} SStreamRunner;
typedef struct {
int8_t parallelizable;
char* qmsg;
// followings are not applicable to encoder and decoder
int8_t numOfRunners;
SStreamRunner* runners;
} STaskExec;
typedef struct {
int8_t reserved;
} STaskDispatcherInplace;
typedef struct {
int32_t nodeId;
SEpSet epSet;
} STaskDispatcherFixedEp;
typedef struct {
int8_t hashMethod;
SArray* info;
} STaskDispatcherShuffle;
typedef struct {
int8_t reserved;
// not applicable to encoder and decoder
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef struct {
int8_t reserved;
} STaskSinkSma;
typedef struct {
int8_t reserved;
} STaskSinkFetch;
typedef struct {
int8_t reserved;
} STaskSinkShow;
enum {
STREAM_SOURCE_TYPE__NONE = 1,
STREAM_SOURCE_TYPE__SUPER,
STREAM_SOURCE_TYPE__CHILD,
STREAM_SOURCE_TYPE__NORMAL,
TASK_SOURCE__SCAN = 1,
TASK_SOURCE__SINGLE,
TASK_SOURCE__MULTI,
};
enum {
STREAM_SINK_TYPE__NONE = 1,
STREAM_SINK_TYPE__INPLACE,
STREAM_SINK_TYPE__ASSIGNED,
STREAM_SINK_TYPE__MULTIPLE,
STREAM_SINK_TYPE__TEMPORARY,
TASK_EXEC__NONE = 1,
TASK_EXEC__EXEC,
};
enum {
STREAM_TYPE__NORMAL = 1,
STREAM_TYPE__SMA,
TASK_DISPATCH__NONE = 1,
TASK_DISPATCH__INPLACE,
TASK_DISPATCH__FIXED,
TASK_DISPATCH__SHUFFLE,
};
typedef struct {
void* inputHandle;
void* executor;
} SStreamRunner;
enum {
TASK_SINK__NONE = 1,
TASK_SINK__TABLE,
TASK_SINK__SMA,
TASK_SINK__FETCH,
TASK_SINK__SHOW,
};
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t level;
int8_t status;
int8_t parallelizable;
// vnode or snode
int8_t nextOpDst;
int8_t sourceType;
int8_t execType;
int8_t sinkType;
int8_t dispatchType;
int16_t dispatchMsgType;
int32_t downstreamTaskId;
int8_t sourceType;
int8_t sinkType;
// source preprocess
// for sink type assigned
int32_t sinkVgId;
SEpSet NextOpEp;
// exec
STaskExec exec;
// executor meta info
char* qmsg;
// local sink
union {
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
STaskSinkShow showSink;
};
// followings are not applied to encoder and decoder
int8_t numOfRunners;
SStreamRunner runner[8];
} SStreamTask;
// dispatch
union {
STaskDispatcherInplace inplaceDispatcher;
STaskDispatcherFixedEp fixedEpDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
};
static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId) {
SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return NULL;
}
pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
pTask->status = STREAM_TASK_STATUS__RUNNING;
pTask->qmsg = NULL;
return pTask;
}
// state storage
} SStreamTask;
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
typedef struct {
SMsgHead head;
// SMsgHead head;
SStreamTask* task;
} SStreamTaskDeployReq;
......@@ -2447,19 +2497,25 @@ typedef struct {
} SStreamTaskDeployRsp;
typedef struct {
SStreamExecMsgHead head;
SArray* data; // SArray<SSDataBlock>
// SMsgHead head;
int64_t streamId;
int32_t taskId;
SArray* data; // SArray<SSDataBlock>
} SStreamTaskExecReq;
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq);
void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq);
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq);
typedef struct {
int32_t reserved;
} SStreamTaskExecRsp;
typedef struct {
SMsgHead head;
int64_t streamId;
int64_t version;
SArray* res; // SArray<SSDataBlock>
// SMsgHead head;
int64_t streamId;
int64_t version;
SArray* res; // SArray<SSDataBlock>
} SStreamSinkReq;
#pragma pack(pop)
......
......@@ -25,7 +25,15 @@ extern "C" {
typedef struct SRpcMsg SRpcMsg;
typedef struct SEpSet SEpSet;
typedef struct SMgmtWrapper SMgmtWrapper;
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
typedef enum {
QUERY_QUEUE,
FETCH_QUEUE,
WRITE_QUEUE,
APPLY_QUEUE,
SYNC_QUEUE,
MERGE_QUEUE,
QUEUE_MAX,
} EQueueType;
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype);
......
......@@ -193,6 +193,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
......@@ -206,6 +209,8 @@ enum {
TD_NEW_MSG_SEG(TDMT_SND_MSG)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "trpc.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifndef _TSTREAM_H_
#define _TSTREAM_H_
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId);
#ifdef __cplusplus
}
#endif
#endif /* ifndef _TSTREAM_H_ */
......@@ -273,7 +273,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
return TSDB_CODE_OUT_OF_MEMORY;
}
pColumnInfoData->varmeta.offset = (int32_t*) p;
pColumnInfoData->varmeta.offset = (int32_t*)p;
memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) {
......@@ -587,11 +587,11 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
if (pBlock->info.rowSize == 0) {
size_t rowSize = 0;
size_t numOfCols = pBlock->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
rowSize += pColInfo->info.bytes;
}
size_t numOfCols = pBlock->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
rowSize += pColInfo->info.bytes;
}
pBlock->info.rowSize = rowSize;
}
......@@ -637,7 +637,7 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
rowSize += sizeof(int32_t);
} else {
rowSize += 1/8.0; // one bit for each record
rowSize += 1 / 8.0; // one bit for each record
}
}
......@@ -1318,12 +1318,97 @@ int32_t tEncodeDataBlocks(void** buf, const SArray* blocks) {
return tlen;
}
void* tDecodeDataBlocks(const void* buf, SArray* blocks) {
void* tDecodeDataBlocks(const void* buf, SArray** blocks) {
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
*blocks = taosArrayInit(sz, sizeof(SSDataBlock));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock pBlock = {0};
buf = tDecodeDataBlock(buf, &pBlock);
taosArrayPush(*blocks, &pBlock);
}
return (void*)buf;
}
static char* formatTimestamp(char* buf, int64_t val, int precision) {
time_t tt;
int32_t ms = 0;
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
ms = val % 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
ms = val % 1000000;
} else {
tt = (time_t)(val / 1000);
ms = val % 1000;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) {
tt--;
if (precision == TSDB_TIME_PRECISION_NANO) {
ms += 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
ms += 1000000;
} else {
ms += 1000;
}
}
struct tm* ptm = localtime(&tt);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", ms);
} else {
sprintf(buf + pos, ".%03d", ms);
}
return buf;
}
void blockDebugShowData(SArray* dataBlocks) {
char pBuf[128];
int32_t sz = taosArrayGetSize(dataBlocks);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
int32_t colNum = pDataBlock->info.numOfCols;
int32_t rows = pDataBlock->info.rows;
for (int32_t j = 0; j < rows; j++) {
printf("|");
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
printf(" %25s |", pBuf);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
printf(" %15d |", *(int32_t*)var);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15ld |", *(int64_t*)var);
break;
}
}
printf("\n");
}
}
}
......@@ -3095,21 +3095,49 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
tfree(pReq->ast);
}
SStreamTask *tNewSStreamTask(int64_t streamId) {
SStreamTask *pTask = (SStreamTask *)calloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return NULL;
}
pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
pTask->status = STREAM_TASK_STATUS__RUNNING;
/*pTask->qmsg = NULL;*/
return pTask;
}
int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) {
if (tEncodeI32(pEncoder, pTask->sinkVgId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1;
if (pTask->execType == TASK_EXEC__EXEC) {
if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
if (pTask->sinkType != TASK_SINK__NONE) {
// TODO: wrap
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;
}
/*tEndEncode(pEncoder);*/
return pEncoder->pos;
}
......@@ -3118,17 +3146,32 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) {
if (tDecodeI32(pDecoder, &pTask->sinkVgId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1;
if (pTask->execType == TASK_EXEC__EXEC) {
if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType != TASK_SINK__NONE) {
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;
}
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
/*tEndDecode(pDecoder);*/
return 0;
}
......@@ -3139,3 +3182,18 @@ void tFreeSStreamTask(SStreamTask *pTask) {
/*free(pTask->executor);*/
/*free(pTask);*/
}
#if 0
int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
/*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/
return pEncoder->size;
}
int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) {
return pEncoder->size;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) {
taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock);
}
#endif
......@@ -94,14 +94,15 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->streamTaskId = htonl(pHead->streamTaskId);
return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM;
pHead->vgId = htonl(pHead->vgId);
return pHead->vgId % SND_UNIQUE_THREAD_NUM;
}
static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
SStreamExecMsgHead *pHead = pMsg->pCont;
pHead->workerType = htonl(pHead->workerType);
return pHead->workerType;
/*SMsgHead *pHead = pMsg->pCont;*/
/*pHead->workerType = htonl(pHead->workerType);*/
/*return pHead->workerType;*/
return 0;
}
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......
......@@ -33,6 +33,7 @@ typedef struct SVnodesMgmt {
SQWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
SWWorkerPool mergePool;
const char *path;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
......@@ -63,6 +64,7 @@ typedef struct {
STaosQueue *pApplyQ;
STaosQueue *pQueryQ;
STaosQueue *pFetchQ;
STaosQueue *pMergeQ;
SMgmtWrapper *pWrapper;
} SVnodeObj;
......@@ -110,10 +112,11 @@ int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessMergeMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_VNODES_INT_H_*/
\ No newline at end of file
#endif /*_TD_DND_VNODES_INT_H_*/
......@@ -280,6 +280,8 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
......
......@@ -208,6 +208,8 @@ int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNode
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
int32_t vmProcessMergeMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); }
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
......@@ -239,6 +241,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
break;
case MERGE_QUEUE:
dTrace("msg:%p, will be put into vnode-merge queue", pMsg);
code = taosWriteQitem(pVnode->pMergeQ, pMsg);
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
break;
......@@ -260,6 +266,10 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
}
int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE);
}
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t size = -1;
SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
......@@ -280,6 +290,9 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
case APPLY_QUEUE:
size = taosQueueSize(pVnode->pApplyQ);
break;
case MERGE_QUEUE:
size = taosQueueSize(pVnode->pMergeQ);
break;
default:
break;
}
......@@ -291,12 +304,13 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
pVnode->pQueryQ == NULL) {
pVnode->pQueryQ == NULL || pVnode->pMergeQ) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -310,12 +324,14 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
pVnode->pWriteQ = NULL;
pVnode->pApplyQ = NULL;
pVnode->pSyncQ = NULL;
pVnode->pFetchQ = NULL;
pVnode->pQueryQ = NULL;
pVnode->pMergeQ = NULL;
dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
}
......
......@@ -729,13 +729,15 @@ typedef struct {
int32_t vgNum;
SRWLatch lock;
int8_t status;
int8_t sourceType;
int8_t sinkType;
// int32_t sqlLen;
int32_t sinkVgId; // 0 for automatic
char* sql;
char* logicalPlan;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* ColAlias;
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* ColAlias; // SArray<char*>
} SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
......
......@@ -46,7 +46,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = htonl(nodeId);
((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
......@@ -69,7 +69,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
......@@ -89,7 +89,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
plan->execNode.nodeId = pSnode->id;
plan->execNode.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
......@@ -111,9 +111,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
int32_t lastUsedVgId = 0;
for (int32_t level = 0; level < totLevel; level++) {
// gather vnodes
// gather snodes
// iterate plan, expand source to vnodes and assign ep to each task
// iterate tasks, assign sink type and sink ep to each task
for (int32_t revLevel = totLevel - 1; revLevel >= 0; revLevel--) {
int32_t level = totLevel - 1 - revLevel;
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, revLevel);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
ASSERT(opNum == 1);
......@@ -132,11 +138,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
lastUsedVgId = pVgroup->vgId;
pStream->vgNum++;
SStreamTask* pTask = streamTaskNew(pStream->uid);
pTask->level = level;
pTask->sourceType = 1;
pTask->sinkType = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = 1;
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
/*pTask->level = level;*/
// TODO
/*pTask->sourceType = STREAM_SOURCE__SUPER;*/
/*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/
pTask->exec.parallelizable = 1;
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
......@@ -145,12 +152,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
taosArrayPush(taskOneLevel, pTask);
}
} else {
SStreamTask* pTask = streamTaskNew(pStream->uid);
pTask->level = level;
pTask->sourceType = 0;
pTask->sinkType = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
pTask->nextOpDst = STREAM_NEXT_OP_DST__VND;
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
/*pTask->level = level;*/
/*pTask->sourceType = STREAM_SOURCE__NONE;*/
/*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/
pTask->exec.parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
if (pSnode == NULL || tsStreamSchedV) {
......
......@@ -57,8 +57,8 @@ void sndMetaDelete(SStreamMeta *pMeta) {
}
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
for (int i = 0; i < pTask->numOfRunners; i++) {
pTask->runner[i].executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL);
for (int i = 0; i < pTask->exec.numOfRunners; i++) {
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
}
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
}
......@@ -72,19 +72,19 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
if (pTask == NULL) {
return -1;
}
free(pTask->qmsg);
free(pTask->exec.qmsg);
// TODO:free executor
free(pTask);
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
}
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamExecMsgHead *pHead = pMsg->pCont;
int32_t taskId = pHead->streamTaskId;
SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);
if (pTask == NULL) {
return -1;
}
/*SStreamExecMsgHead *pHead = pMsg->pCont;*/
/*int32_t taskId = pHead->streamTaskId;*/
/*SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);*/
/*if (pTask == NULL) {*/
/*return -1;*/
/*}*/
return 0;
}
......
......@@ -43,6 +43,7 @@ target_link_libraries(
PUBLIC wal
PUBLIC scheduler
PUBLIC executor
PUBLIC stream
PUBLIC qworker
PUBLIC sync
)
......
......@@ -16,8 +16,9 @@
#include "tcompare.h"
#include "tqInt.h"
#include "tqMetaStore.h"
#include "tstream.h"
void tqDebugShowSSData(SArray* dataBlocks);
void blockDebugShowData(SArray* dataBlocks);
int32_t tqInit() { return tqPushMgrInit(); }
......@@ -441,16 +442,21 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
ASSERT(parallel <= 8);
pTask->numOfRunners = parallel;
if (pTask->execType == TASK_EXEC__NONE) return 0;
pTask->exec.numOfRunners = parallel;
for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {
.reader = pReadHandle,
.meta = pTq->pVnodeMeta,
};
pTask->runner[i].inputHandle = pReadHandle;
pTask->runner[i].executor = qCreateStreamExecTaskInfo(pTask->qmsg, &handle);
pTask->exec.runners = calloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) {
return -1;
}
pTask->exec.runners[i].inputHandle = pReadHandle;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
}
return 0;
}
......@@ -473,87 +479,6 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
return 0;
}
static char* formatTimestamp(char* buf, int64_t val, int precision) {
time_t tt;
int32_t ms = 0;
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
ms = val % 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
ms = val % 1000000;
} else {
tt = (time_t)(val / 1000);
ms = val % 1000;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) {
tt--;
if (precision == TSDB_TIME_PRECISION_NANO) {
ms += 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
ms += 1000000;
} else {
ms += 1000;
}
}
struct tm* ptm = localtime(&tt);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", ms);
} else {
sprintf(buf + pos, ".%03d", ms);
}
return buf;
}
void tqDebugShowSSData(SArray* dataBlocks) {
char pBuf[128];
int32_t sz = taosArrayGetSize(dataBlocks);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
int32_t colNum = pDataBlock->info.numOfCols;
int32_t rows = pDataBlock->info.rows;
for (int32_t j = 0; j < rows; j++) {
printf("|");
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
printf(" %25s |", pBuf);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
printf(" %15d |", *(int32_t*)var);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15ld |", *(int64_t*)var);
break;
}
}
printf("\n");
}
}
}
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
void* pIter = NULL;
......@@ -561,50 +486,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (!pTask->sourceType) continue;
int32_t workerId = 0;
void* exec = pTask->runner[workerId].executor;
qSetStreamInput(exec, data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* output;
uint64_t ts;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) {
break;
}
taosArrayPush(pRes, output);
}
if (pTask->sinkType) {
// write back
/*printf("reach end\n");*/
tqDebugShowSSData(pRes);
} else {
int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead));
tEncodeDataBlocks(abuf, pRes);
tmsg_t type;
if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) {
type = TDMT_VND_TASK_EXEC;
} else {
type = TDMT_SND_TASK_EXEC;
}
SRpcMsg reqMsg = {
.pCont = buf,
.contLen = tlen,
.code = 0,
.msgType = type,
};
tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &reqMsg);
if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, 0) < 0) {
// TODO
}
}
return 0;
......@@ -612,33 +496,12 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
SStreamTaskExecReq* pReq = msg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
ASSERT(pTask);
int32_t taskId = pReq->head.streamTaskId;
int32_t workerType = pReq->head.workerType;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
// assume worker id is 1
int32_t workerId = 1;
void* exec = pTask->runner[workerId].executor;
int32_t sz = taosArrayGetSize(pReq->data);
printf("input data:\n");
tqDebugShowSSData(pReq->data);
SArray* pRes = taosArrayInit(0, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* input = taosArrayGet(pReq->data, i);
SSDataBlock* output;
uint64_t ts;
qSetStreamInput(exec, input, STREAM_DATA_TYPE_SSDATA_BLOCK);
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(0);
}
if (output == NULL) {
break;
}
taosArrayPush(pRes, &output);
if (streamExecTask(pTask, &pTq->pVnode->msgCb, pReq->data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) {
// TODO
}
printf("output data:\n");
tqDebugShowSSData(pRes);
return 0;
}
......@@ -8,6 +8,7 @@ add_subdirectory(scheduler)
add_subdirectory(cache)
add_subdirectory(catalog)
add_subdirectory(executor)
add_subdirectory(stream)
add_subdirectory(planner)
add_subdirectory(function)
add_subdirectory(qcom)
......
aux_source_directory(src STREAM_SRC)
add_library(stream STATIC ${STREAM_SRC})
target_include_directories(
stream
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/stream"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
stream
PRIVATE os util transport qcom executor
)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef __cplusplus
extern "C" {
#endif
#ifndef _TSTREAM_H_
#define _TSTREAM_H_
#ifdef __cplusplus
}
#endif
#endif /* ifndef _TSTREAM_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstream.h"
#include "executor.h"
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
SArray* pRes = NULL;
// source
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;
// exec
if (pTask->execType == TASK_EXEC__EXEC) {
ASSERT(workId < pTask->exec.numOfRunners);
void* exec = pTask->exec.runners[workId].executor;
pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
qSetStreamInput(exec, input, inputType);
while (1) {
SSDataBlock* output;
uint64_t ts;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) {
break;
}
taosArrayPush(pRes, output);
}
} else if (inputType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
const SArray* blocks = (const SArray*)input;
int32_t sz = taosArrayGetSize(blocks);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pBlock = taosArrayGet(blocks, i);
qSetStreamInput(exec, pBlock, inputType);
while (1) {
SSDataBlock* output;
uint64_t ts;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) {
break;
}
taosArrayPush(pRes, output);
}
}
} else {
ASSERT(0);
}
} else {
ASSERT(inputType == STREAM_DATA_TYPE_SSDATA_BLOCK);
pRes = (SArray*)input;
}
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
//
} else if (pTask->sinkType == TASK_SINK__SMA) {
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
} else if (pTask->sinkType == TASK_SINK__SHOW) {
blockDebugShowData(pRes);
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
// dispatch
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
SStreamTaskExecReq req = {
.streamId = pTask->streamId,
.taskId = pTask->taskId,
.data = pRes,
};
int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSStreamTaskExecReq(&abuf, &req);
SRpcMsg dispatchMsg = {
.pCont = buf,
.contLen = tlen,
.code = 0,
.msgType = pTask->dispatchMsgType,
};
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
int32_t qType;
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
qType = FETCH_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
qType = MERGE_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
qType = WRITE_QUEUE;
} else {
ASSERT(0);
}
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
((SMsgHead*)buf)->vgId = pTask->fixedEpDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO
} else {
ASSERT(0);
}
}
return 0;
}
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->streamId);
tlen += taosEncodeFixedI32(buf, pReq->taskId);
tlen += tEncodeDataBlocks(buf, pReq->data);
return tlen;
}
void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->streamId);
buf = taosDecodeFixedI32(buf, &pReq->taskId);
buf = tDecodeDataBlocks(buf, &pReq->data);
return (void*)buf;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册