未验证 提交 9b3e34d5 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21735 from taosdata/refact/fillhistory

refactor: refactor the fill history operation
......@@ -5,8 +5,8 @@ if (${BUILD_CONTRIB})
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
......@@ -18,8 +18,8 @@ else()
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
......
......@@ -54,6 +54,11 @@ typedef struct SSessionKey {
uint64_t groupId;
} SSessionKey;
typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
} SVersionRange;
static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2;
......
......@@ -177,7 +177,6 @@ static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint
int32_t getJsonValueLen(const char* data);
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
......@@ -187,6 +186,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx);
void colDataTrim(SColumnInfoData* pColumnInfoData);
......@@ -208,7 +208,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
......@@ -237,11 +236,10 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag);
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
......@@ -251,9 +249,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
}
void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);
#ifdef __cplusplus
}
......
......@@ -252,7 +252,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
......@@ -297,8 +299,7 @@ enum {
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
......
......@@ -55,6 +55,9 @@ typedef struct {
void* pStateBackend;
struct SStorageAPI api;
int8_t fillHistory;
STimeWindow winRange;
} SReadHandle;
// in queue mode, data streams are seperated by msg
......@@ -193,14 +196,6 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key);
/**
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
* @param tinfo
* @param uid
* @param ts
* @return
*/
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
......@@ -220,15 +215,22 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task);
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo);
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo);
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -234,29 +234,6 @@ typedef struct SStoreSnapshotFn {
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
} SStoreSnapshotFn;
/**
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderReleaseLock(SMetaReader *pReader);
void metaReaderClear(SMetaReader *pReader);
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t
payloadLen);
*/
typedef struct SStoreMeta {
SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
......@@ -403,7 +380,7 @@ typedef struct SStateStore {
SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark);
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char*id);
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState);
......@@ -415,6 +392,7 @@ typedef struct SStateStore {
int32_t (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
} SStateStore;
typedef struct SStorageAPI {
......
......@@ -129,30 +129,38 @@ typedef struct SSerializeDataHandle {
} SSerializeDataHandle;
// incremental state storage
typedef struct SBackendCfWrapper {
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
void *compactFactory;
TdThreadRwlock rwLock;
bool remove;
int64_t backendId;
char idstr[64];
} SBackendCfWrapper;
typedef struct STdbState {
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
SBackendCfWrapper *pBackendCfWrapper;
int64_t backendCfWrapperId;
char idstr[64];
struct SStreamTask *pOwner;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
char idstr[64];
void *compactFactory;
TdThreadRwlock rwLock;
void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
} STdbState;
typedef struct {
......
......@@ -138,6 +138,8 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
/***compare func **/
typedef struct SStateChekpoint {
......
......@@ -44,10 +44,8 @@ enum {
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__WAIT_DOWNSTREAM,
TASK_STATUS__RECOVER_PREPARE,
TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
TASK_STATUS__PAUSE,
};
......@@ -133,7 +131,6 @@ typedef struct {
// ref data block, for delete
typedef struct {
int8_t type;
int64_t ver;
SSDataBlock* pBlock;
} SStreamRefDataBlock;
......@@ -203,13 +200,11 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
void* streamQueueNextItem(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit);
typedef struct {
char* qmsg;
void* pExecutor; // not applicable to encoder and decoder
......@@ -251,7 +246,7 @@ typedef struct {
int8_t reserved;
} STaskSinkFetch;
typedef struct {
typedef struct SStreamChildEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
......@@ -271,31 +266,55 @@ typedef struct SCheckpointInfo {
} SCheckpointInfo;
typedef struct SStreamStatus {
int8_t taskStatus;
int8_t schedStatus;
int8_t keepTaskStatus;
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
} SStreamStatus;
typedef struct SHistDataRange {
SVersionRange range;
STimeWindow window;
} SHistDataRange;
typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id
SEpSet epSet;
int32_t selfChildId;
int32_t totalLevel;
int8_t taskLevel;
int8_t fillHistory; // is fill history task or not
} SSTaskBasicInfo;
typedef struct SDispatchMsgInfo {
void* pData; // current dispatch data
int16_t msgType; // dispatch msg type
int32_t retryCount; // retry send data count
int64_t blockingTs; // output blocking timestamp
} SDispatchMsgInfo;
typedef struct {
int8_t outputType;
int8_t outputStatus;
SStreamQueue* outputQueue;
} SSTaskOutputInfo;
struct SStreamTask {
SStreamId id;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
int32_t nodeId; // vgroup id
SEpSet epSet;
SCheckpointInfo chkInfo;
STaskExec exec;
int8_t fillHistory; // fill history
int64_t ekey; // end ts key
int64_t endVer; // end version
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
SStreamId id;
SSTaskBasicInfo info;
int8_t outputType;
SDispatchMsgInfo msgInfo;
SStreamStatus status;
SCheckpointInfo chkInfo;
STaskExec exec;
SHistDataRange dataRange;
SStreamId historyTaskId;
SStreamId streamTaskId;
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
// output
union {
......@@ -314,13 +333,14 @@ struct SStreamTask {
// trigger
int8_t triggerStatus;
int64_t triggerParam;
void* timer;
void* schedTimer;
void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int32_t notReadyTasks;
int32_t numOfWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
......@@ -332,21 +352,22 @@ struct SStreamTask {
// meta
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SArray* pTaskList; // SArray<task_id*>
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SArray* pTaskList; // SArray<task_id*>
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
......@@ -431,16 +452,17 @@ typedef struct {
SMsgHead msgHead;
int64_t streamId;
int32_t taskId;
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
int8_t igUntreated;
} SStreamScanHistoryReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t childId;
} SStreamRecoverFinishReq;
} SStreamRecoverFinishReq, SStreamTransferReq;
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
typedef struct {
int64_t streamId;
......@@ -509,11 +531,11 @@ typedef struct {
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp;
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
......@@ -525,9 +547,11 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
......@@ -542,30 +566,44 @@ int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
void streamPrepareNdoCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common
int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
// source level
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask);
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
// int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
void streamMetaInit();
void streamMetaCleanup();
......@@ -591,6 +629,9 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
#ifdef __cplusplus
}
#endif
......
......@@ -28,11 +28,10 @@ extern "C" {
#endif
typedef struct SStreamFileState SStreamFileState;
typedef SList SStreamSnapshot;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark);
GetTsFun fp, void* pFile, TSKEY delMark, const char* id);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState);
......@@ -50,6 +49,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
#ifdef __cplusplus
}
......
......@@ -160,7 +160,7 @@ static const SSysDbTableSchema streamSchema[] = {
static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "task_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "task_id", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
......
此差异已折叠。
......@@ -76,6 +76,9 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:
......
......@@ -746,9 +746,10 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -92,7 +92,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
......
......@@ -647,6 +647,14 @@ typedef struct {
// SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj;
typedef struct SStreamConf {
int8_t igExpired;
int8_t trigger;
int8_t fillHistory;
int64_t triggerParam;
int64_t watermark;
} SStreamConf;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
// ctl
......@@ -660,12 +668,7 @@ typedef struct {
// info
int64_t uid;
int8_t status;
// config
int8_t igExpired;
int8_t trigger;
int8_t fillHistory;
int64_t triggerParam;
int64_t watermark;
SStreamConf conf;
// source and target
int64_t sourceDbUid;
int64_t targetDbUid;
......@@ -675,14 +678,18 @@ typedef struct {
int64_t targetStbUid;
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
SVgObj fixedSinkVg;
int32_t fixedSinkVgId; // 0 for shuffle
// transformation
char* sql;
char* ast;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* pHTasksList; // generate the results for already stored ts data
int64_t hTaskUid; // stream task for history ts data
SSchemaWrapper outputSchema;
SSchemaWrapper tagSchema;
......
......@@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey);
#ifdef __cplusplus
}
......
......@@ -30,11 +30,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
......@@ -97,11 +97,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
......@@ -154,18 +154,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
return 0;
}
void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
int32_t sz = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
static void* freeStreamTasks(SArray* pTaskLevel) {
int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) {
SArray *pLevel = taosArrayGetP(pTaskLevel, i);
int32_t taskSz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < taskSz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
......@@ -175,7 +167,20 @@ void tFreeStreamObj(SStreamObj *pStream) {
taosArrayDestroy(pLevel);
}
taosArrayDestroy(pStream->tasks);
return taosArrayDestroy(pTaskLevel);
}
void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
pStream->tasks = freeStreamTasks(pStream->tasks);
pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
// tagSchema.pSchema
if (pStream->tagSchema.nCols > 0) {
......
......@@ -367,10 +367,10 @@ void dumpStream(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "smaId", i642str(pObj->smaId));
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
tjsonAddStringToObject(item, "status", i642str(pObj->status));
tjsonAddStringToObject(item, "igExpired", i642str(pObj->igExpired));
tjsonAddStringToObject(item, "trigger", i642str(pObj->trigger));
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->triggerParam));
tjsonAddStringToObject(item, "watermark", i642str(pObj->watermark));
tjsonAddStringToObject(item, "igExpired", i642str(pObj->conf.igExpired));
tjsonAddStringToObject(item, "trigger", i642str(pObj->conf.trigger));
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->conf.triggerParam));
tjsonAddStringToObject(item, "watermark", i642str(pObj->conf.watermark));
tjsonAddStringToObject(item, "sourceDbUid", i642str(pObj->sourceDbUid));
tjsonAddStringToObject(item, "targetDbUid", i642str(pObj->targetDbUid));
tjsonAddStringToObject(item, "sourceDb", mndGetDbStr(pObj->sourceDb));
......
......@@ -542,32 +542,32 @@ int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, i
STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n1, false);
colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n2, false);
colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n3, false);
colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&invalid, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&invalid, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pIdx->createdTime, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pIdx->createdTime, false);
char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(col, (char *)pIdx->colName);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)col, false);
colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tag, (char *)"tag_index");
colDataAppend(pColInfo, numOfRows, (const char *)tag, false);
colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
numOfRows++;
sdbRelease(pSdb, pIdx);
......
......@@ -555,20 +555,20 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.version = 1;
streamObj.sql = taosStrdup(pCreate->sql);
streamObj.smaId = smaObj.uid;
streamObj.watermark = pCreate->watermark;
streamObj.conf.watermark = pCreate->watermark;
streamObj.deleteMark = pCreate->deleteMark;
streamObj.fillHistory = STREAM_FILL_HISTORY_ON;
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
streamObj.triggerParam = pCreate->maxDelay;
streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
streamObj.conf.triggerParam = pCreate->maxDelay;
streamObj.ast = taosStrdup(smaObj.ast);
// check the maxDelay
if (streamObj.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND);
streamObj.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
}
if (streamObj.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
streamObj.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
}
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
......@@ -597,8 +597,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = streamObj.trigger,
.watermark = streamObj.watermark,
.triggerType = streamObj.conf.trigger,
.watermark = streamObj.conf.watermark,
.deleteMark = streamObj.deleteMark,
};
......@@ -633,7 +633,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......@@ -1278,13 +1278,13 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
STR_TO_VARSTR(col, (char *)"");
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)col, false);
colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tag, (char *)"sma_index");
colDataAppend(pColInfo, numOfRows, (const char *)tag, false);
colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
numOfRows++;
sdbRelease(pSdb, pSma);
......
......@@ -239,7 +239,7 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
}
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->trigger;
int8_t trigger = pStream->conf.trigger;
if (trigger == STREAM_TRIGGER_AT_ONCE) {
strcpy(dst, "at once");
} else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......@@ -299,13 +299,18 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->smaId = 0;
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0;
pObj->igExpired = pCreate->igExpired;
pObj->trigger = pCreate->triggerType;
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory;
pObj->conf.igExpired = pCreate->igExpired;
pObj->conf.trigger = pCreate->triggerType;
pObj->conf.triggerParam = pCreate->maxDelay;
pObj->conf.watermark = pCreate->watermark;
pObj->conf.fillHistory = pCreate->fillHistory;
pObj->deleteMark = pCreate->deleteMark;
pObj->igCheckUpdate = pCreate->igUpdate;
......@@ -387,9 +392,9 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
.igExpired = pObj->igExpired,
.triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
.watermark = pObj->conf.watermark,
.igExpired = pObj->conf.igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate,
};
......@@ -428,30 +433,37 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeStreamTask(&encoder, pTask);
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tEncoderClear(&encoder);
void *buf = taosMemoryCalloc(1, tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead *)buf)->vgId = htonl(pTask->nodeId);
((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, size);
tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
STransAction action = {0};
action.mTraceId = pTrans->mTraceId;
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_STREAM_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
return -1;
}
return 0;
}
......@@ -459,14 +471,33 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
return -1;
}
}
}
// persistent stream task for already stored ts data
if (pStream->conf.fillHistory) {
level = taosArrayGetSize(pStream->pHTasksList);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
return -1;
}
}
}
}
return 0;
}
......@@ -474,11 +505,13 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (mndPersistStreamTasks(pMnode, pTrans, pStream) < 0) {
return -1;
}
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
......@@ -490,6 +523,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
return 0;
}
......@@ -603,16 +637,17 @@ _OVER:
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
// vnode
/*if (pTask->nodeId > 0) {*/
/*if (pTask->info.nodeId > 0) {*/
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_DROP;
......@@ -732,6 +767,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
......@@ -748,7 +784,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
// schedule stream task for stream obj
if (mndScheduleStream(pMnode, &streamObj) < 0) {
if (mndScheduleStream(pMnode, &streamObj, createStreamReq.lastTs) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
......@@ -834,7 +870,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
SMStreamDoCheckpointMsg *pMsg) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = pMsg->checkpointId;
req.nodeId = pTask->nodeId;
req.nodeId = pTask->info.nodeId;
req.expireTime = -1;
req.streamId = pTask->streamId;
req.taskId = pTask->taskId;
......@@ -863,7 +899,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pTask->nodeId);
pMsgHead->vgId = htonl(pTask->info.nodeId);
tEncoderClear(&encoder);
......@@ -902,12 +938,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
/*A(pTask->nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosRUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
......@@ -965,8 +1001,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMDropStreamReq dropReq = {0};
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
......@@ -1157,7 +1191,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->watermark, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
char trigger[20 + VARSTR_HEADER_SIZE] = {0};
char trigger2[20] = {0};
......@@ -1187,12 +1221,16 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break;
if (pShow->pIter == NULL) {
break;
}
// lock
taosRLockLatch(&pStream->lock);
// count task num
int32_t sz = taosArrayGetSize(pStream->tasks);
int32_t count = 0;
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
......@@ -1202,10 +1240,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
// add row for each task
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t levelCnt = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < levelCnt; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
......@@ -1215,18 +1255,25 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// stream name
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->id.taskId, false);
char idstr[128] = {0};
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
idstr[2] = '0';
idstr[3] = 'x';
varDataSetLen(idstr, len + 2);
colDataSetVal(pColInfo, numOfRows, idstr, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
varDataSetLen(nodeType, 5);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->nodeId > 0) {
if (pTask->info.nodeId > 0) {
memcpy(varDataVal(nodeType), "vnode", 5);
} else {
memcpy(varDataVal(nodeType), "snode", 5);
......@@ -1235,30 +1282,50 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// node id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int64_t nodeId = TMAX(pTask->nodeId, 0);
int64_t nodeId = TMAX(pTask->info.nodeId, 0);
colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
// level
char level[20 + VARSTR_HEADER_SIZE] = {0};
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
memcpy(varDataVal(level), "source", 6);
varDataSetLen(level, 6);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
memcpy(varDataVal(level), "agg", 3);
varDataSetLen(level, 3);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4);
varDataSetLen(level, 4);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[20] = {0};
strcpy(status, "normal");
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
if (taskStatus == TASK_STATUS__NORMAL) {
memcpy(varDataVal(status), "normal", 6);
varDataSetLen(status, 6);
} else if (taskStatus == TASK_STATUS__DROPPING) {
memcpy(varDataVal(status), "dropping", 8);
varDataSetLen(status, 8);
} else if (taskStatus == TASK_STATUS__FAIL) {
memcpy(varDataVal(status), "fail", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__STOP) {
memcpy(varDataVal(status), "stop", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__SCAN_HISTORY) {
memcpy(varDataVal(status), "history", 7);
varDataSetLen(status, 7);
} else if (taskStatus == TASK_STATUS__HALT) {
memcpy(varDataVal(status), "halt", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__PAUSE) {
memcpy(varDataVal(status), "pause", 5);
varDataSetLen(status, 5);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
......@@ -1287,10 +1354,10 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVPauseStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_PAUSE;
......@@ -1301,21 +1368,36 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t size = taosArrayGetSize(pStream->tasks);
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) {
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
SArray *pTasks = taosArrayGetP(tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
}
return 0;
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t code = mndPauseAllStreamTaskImpl(pTrans, pStream->tasks);
if (code != 0) {
return code;
}
// pStream->pHTasksList is null
// code = mndPauseAllStreamTaskImpl(pTrans, pStream->pHTasksList);
return code;
}
static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) {
SStreamObj streamObj = {0};
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
......@@ -1355,6 +1437,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
}
if (pStream->status == STREAM_STATUS__PAUSE) {
return 0;
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
......@@ -1410,11 +1496,11 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->igUntreated = igUntreated;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVResumeStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_RESUME;
......@@ -1432,11 +1518,16 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
}
}
}
// pStream->pHTasksList is null
return 0;
}
......@@ -1463,6 +1554,10 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
}
if (pStream->status != STREAM_STATUS__PAUSE) {
return 0;
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
......
......@@ -875,7 +875,7 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
// if (pDb == NULL || pDb->compactStartTime <= 0) {
// colDataSetNULL(pColInfo, numOfRows);
// } else {
// colDataAppend(pColInfo, numOfRows, (const char *)&pDb->compactStartTime, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pDb->compactStartTime, false);
// }
numOfRows++;
......
......@@ -52,23 +52,21 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
FAIL:
if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = {
.code = code,
.info = pMsg->info,
};
SRpcMsg rsp = { .code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->childEpInfo) != 0);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0);
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->inputQueue = streamQueueOpen(0);
pTask->outputQueue = streamQueueOpen(0);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->outputQueue = streamQueueOpen(512 << 10);
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
return -1;
......@@ -85,14 +83,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
return -1;
}
int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamEpInfoList);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
initStreamStateAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor);
streamSetupTrigger(pTask);
streamSetupScheduleTrigger(pTask);
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);
return 0;
}
......@@ -149,9 +151,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
taosMemoryFree(pTask);
return -1;
}
tDecoderClear(&decoder);
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
// 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
......@@ -161,19 +164,20 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
return -1;
}
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
taosWUnLockLatch(&pSnode->pMeta->lock);
// 3.go through recover steps to fill history
if (pTask->fillHistory) {
streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask);
}
streamPrepareNdoCheckDownstream(pTask);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
return 0;
}
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
return 0;
}
......@@ -255,13 +259,15 @@ int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
}
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY:
case TDMT_STREAM_TASK_DEPLOY: {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return sndProcessTaskDeployReq(pSnode, pReq, len);
}
case TDMT_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pReq, len);
return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen);
default:
ASSERT(0);
}
......@@ -277,7 +283,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
tDecodeSStreamRecoverFinishReq(&decoder, &req);
tDecodeStreamRecoverFinishReq(&decoder, &req);
tDecoderClear(&decoder);
// find task
......@@ -286,7 +292,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
return -1;
}
// do process request
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1;
}
......@@ -300,6 +306,102 @@ int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
return 0;
}
int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask);
qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
} else {
rsp.status = 0;
qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) {
qError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId);
return -1;
}
void *buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead *)buf)->vgId = htonl(req.upstreamNodeId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t *)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
tDecoderClear(&decoder);
return -1;
}
tDecoderClear(&decoder);
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId);
if (pTask == NULL) {
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pSnode->pMeta->vgId);
return -1;
}
code = streamProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return code;
}
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
......@@ -312,10 +414,14 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return sndProcessTaskRetrieveReq(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
case TDMT_STREAM_RECOVER_FINISH:
case TDMT_STREAM_SCAN_HISTORY_FINISH:
return sndProcessTaskRecoverFinishReq(pSnode, pMsg);
case TDMT_STREAM_RECOVER_FINISH_RSP:
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK:
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK_RSP:
return sndProcessStreamTaskCheckRsp(pSnode, pMsg);
default:
ASSERT(0);
}
......
......@@ -101,6 +101,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy= streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
}
void initFunctionStateStore(SFunctionStateStore* pStore) {
......
......@@ -96,6 +96,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
......@@ -126,8 +127,6 @@ int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey,
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen);
int64_t metaGetTbNum(SMeta *pMeta);
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
// tsdb
......
......@@ -45,27 +45,10 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore;
// tqPush
// typedef struct {
// // msg info
// int64_t consumerId;
// int64_t reqOffset;
// int64_t processedVer;
// int32_t epoch;
// // rpc info
// int64_t reqId;
// SRpcHandleInfo rpcInfo;
// tmr_h timerId;
// int8_t tmrStopped;
// // exec
// int8_t inputStatus;
// int8_t execStatus;
// SStreamQueue inputQ;
// SRWLatch lock;
// } STqPushHandle;
#define EXTRACT_DATA_FROM_WAL_ID (-1)
#define STREAM_TASK_STATUS_CHECK_ID (-2)
// tqExec
typedef struct {
char* qmsg; // SubPlanToString
} STqExecCol;
......@@ -184,10 +167,10 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqStreamTasksScanWal(STQ* pTq);
int32_t tqStreamTasksStatusCheck(STQ* pTq);
// tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
......
......@@ -64,7 +64,6 @@ typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
......@@ -383,11 +382,6 @@ struct TSDBKEY {
TSKEY ts;
};
struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
};
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
......
......@@ -178,7 +178,8 @@ SArray* metaGetSmaTbUids(SMeta* pMeta);
void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta);
void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
int64_t metaGetTbNum(SMeta *pMeta);
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
......@@ -217,6 +218,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCheckStreamStatus(STQ* pTq);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......@@ -238,14 +240,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
......@@ -17,13 +17,13 @@
#include "osMemory.h"
#include "tencode.h"
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta *pAPI) {
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
metaReaderInit(pReader, pMeta, flags);
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
metaReaderDoInit(pReader, pMeta, flags);
pReader->pAPI = pAPI;
}
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
memset(pReader, 0, sizeof(*pReader));
pReader->pMeta = pMeta;
pReader->flags = flags;
......@@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
......@@ -159,7 +159,7 @@ int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
metaReaderDoInit(&mr, (SMeta *)meta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
......@@ -174,7 +174,7 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
SMetaReader *pReader = &mr;
......@@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
code = metaGetTableEntryByName(&mr, tbName);
if (code == 0) *tbType = mr.me.type;
......@@ -215,7 +215,7 @@ int metaReadNext(SMetaReader *pReader) {
int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays) {
int code = -1;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
metaReaderDoInit(&mr, (SMeta *)meta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
goto _exit;
......@@ -244,9 +244,7 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) {
return NULL;
}
SVnode *pVnodeObj = pVnode;
// metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0);
SVnode* pVnodeObj = pVnode;
// tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
pTbCur->pMeta = pVnodeObj->pMeta;
pTbCur->paused = 1;
......@@ -277,7 +275,7 @@ void metaPauseTbCursor(SMTbCursor *pTbCur) {
}
void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) {
if (pTbCur->paused) {
metaReaderInit(&pTbCur->mr, pTbCur->pMeta, 0);
metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, 0);
tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
......@@ -784,7 +782,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
}
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
int64_t smaId;
int smaIdx = 0;
STSma *pTSma = NULL;
......@@ -839,7 +837,7 @@ _err:
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma *pTSma = NULL;
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaReaderGetTableEntryByUid(&mr, indexUid) < 0) {
metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
metaReaderClear(&mr);
......
......@@ -37,7 +37,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
// validate req
// save smaIndex
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) {
#if 1
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
......
......@@ -709,7 +709,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
}
// validate req
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
if (pReq->type == TSDB_CHILD_TABLE && pReq->ctb.suid != mr.me.ctbEntry.suid) {
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
......
......@@ -896,7 +896,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
metaReaderInit(&mr, SMA_META(pSma), 0);
metaReaderDoInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaReaderGetTableEntryByUidCache(&mr, pInfo->suid) < 0) {
code = terrno;
......@@ -1116,7 +1116,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
}
int64_t nRsmaTables = 0;
metaReaderInit(&mr, SMA_META(pSma), 0);
metaReaderDoInit(&mr, SMA_META(pSma), 0);
if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
......
此差异已折叠。
......@@ -114,7 +114,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
}
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
metaReaderClear(&mr);
......@@ -1109,7 +1109,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
if (code != 0) {
tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
......
......@@ -16,6 +16,7 @@
#include "tq.h"
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId);
// this function should be executed by stream threads.
// extract submit block from WAL, and add them into the input queue for the sources tasks.
......@@ -57,7 +58,111 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
return 0;
}
static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
int32_t tqStreamTasksStatusCheck(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to check all (%d) stream tasks downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
taosWLockLatch(&pMeta->lock);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosWUnLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId);
if (pTask == NULL) {
continue;
}
streamTaskCheckDownstreamTasks(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return 0;
}
int32_t tqCheckStreamStatus(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock);
return -1;
}
tqDebug("vgId:%d check for stream tasks status, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = STREAM_TASK_STATUS_CHECK_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
pMeta->walScanCounter += 1;
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock);
return -1;
}
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = EXTRACT_DATA_FROM_WAL_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
// seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (pTask->chkInfo.currentVer < firstVer) {
......@@ -102,7 +207,7 @@ static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true;
bool noNewDataInWal = true;
bool noDataInWal = true;
int32_t vgId = pStreamMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
......@@ -129,15 +234,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
int32_t status = pTask->status.taskStatus;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel);
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status);
if (status != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......@@ -157,39 +260,47 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue);
// append the data for the stream
SStreamQueueItem* pItem = NULL;
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// delete ignore
if (pItem == NULL) {
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
noNewDataInWal = false;
if (pItem != NULL) {
noDataInWal = false;
code = tAppendDataToInputQueue(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pTask->chkInfo.currentVer);
} else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
pTask->chkInfo.currentVer);
}
}
code = tqAddInputBlockNLaunchTask(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pTask->chkInfo.currentVer);
} else {
tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
if ((code == TSDB_CODE_SUCCESS) || (numOfItemsInQ > 0)) {
code = streamSchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
return -1;
}
}
streamMetaReleaseTask(pStreamMeta, pTask);
}
// all wal are checked, and no new data available in wal.
if (noNewDataInWal) {
if (noDataInWal) {
*pScanIdle = true;
}
taosArrayDestroy(pTaskList);
return 0;
}
......@@ -48,7 +48,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
metaReaderDoInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
......
......@@ -309,7 +309,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tbData.uid = pTableSinkInfo->uid;
} else {
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
metaReaderDoInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
......@@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
tqTrace("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
......
......@@ -20,12 +20,6 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
return taosStrdup(buf);
}
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) {
......
......@@ -1610,7 +1610,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
tb_uid_t suid = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
metaReaderClear(&mr); // table not esist
return 0;
......
......@@ -248,7 +248,7 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb
STbData* piMemTbData);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, const char* id);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
......@@ -775,7 +775,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->order = pCond->order;
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->verRange = getQueryVerRange(pVnode, pCond, idstr);
pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
......@@ -3721,7 +3721,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
return VND_TSDB(pVnode);
}
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, const char* id) {
int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
int64_t endVer = 0;
......@@ -3732,6 +3732,9 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
}
tsdbDebug("queried verRange:%"PRId64"-%"PRId64", revised query verRange:%"PRId64"-%"PRId64", %s", pCond->startVersion,
pCond->endVersion, startVer, endVer, id);
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
......@@ -5452,7 +5455,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
......@@ -5584,57 +5587,3 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/
// opt perf, do NOT create so many readers
int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) {
SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC,
.startVersion = -1, .endVersion = -1};
cond.twindows.skey = INT64_MIN;
cond.twindows.ekey = INT64_MAX;
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols);
if (cond.colList == NULL || cond.pSlotList == NULL) {
// todo
}
cond.colList[0].colId = 1;
cond.colList[0].slotId = 0;
cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP;
cond.pSlotList[0] = 0;
STableKeyInfo* pTableKeyInfo = pTableList;
STsdbReader* pReader = NULL;
SSDataBlock* pBlock = createDataBlock();
SColumnInfoData data = {0};
data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE};
blockDataAppendColInfo(pBlock, &data);
int64_t key = INT64_MIN;
for(int32_t i = 0; i < numOfTables; ++i) {
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, (void**)&pReader, pIdStr, false, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bool hasData = false;
code = tsdbNextDataBlock(pReader, &hasData);
if (!hasData || code != TSDB_CODE_SUCCESS) {
continue;
}
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
int64_t k = *(int64_t*)pCol->pData;
if (key < k) {
key = k;
}
tsdbReaderClose(pReader);
}
return 0;
}
......@@ -528,25 +528,25 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK
*maxKey = *minKey + tsTickPerMin[precision] * minutes - 1;
}
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) {
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec) {
int32_t aFid[3];
TSKEY key;
if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
now = now * 1000;
nowSec = nowSec * 1000;
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
now = now * 1000000l;
nowSec = nowSec * 1000000l;
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
now = now * 1000000000l;
nowSec = nowSec * 1000000000l;
} else {
ASSERT(0);
}
key = now - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
key = nowSec - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
key = now - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
key = nowSec - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
aFid[1] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
key = now - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision];
key = nowSec - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision];
aFid[2] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
if (fid >= aFid[0]) {
......@@ -640,7 +640,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal * pColVal = &(SColVal){0};
SColVal *pColVal = &(SColVal){0};
STColumn *pTColumn;
int32_t iCol, jCol = 1;
......@@ -764,7 +764,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
}
}
int32_t tsdbRowMergerInit(SRowMerger* pMerger, STSchema *pSchema) {
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema) {
pMerger->pTSchema = pSchema;
pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
if (pMerger->pArray == NULL) {
......@@ -774,7 +774,7 @@ int32_t tsdbRowMergerInit(SRowMerger* pMerger, STSchema *pSchema) {
}
}
void tsdbRowMergerClear(SRowMerger* pMerger) {
void tsdbRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
......@@ -785,7 +785,7 @@ void tsdbRowMergerClear(SRowMerger* pMerger) {
taosArrayClear(pMerger->pArray);
}
void tsdbRowMergerCleanup(SRowMerger* pMerger) {
void tsdbRowMergerCleanup(SRowMerger *pMerger) {
int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
......@@ -1041,8 +1041,6 @@ int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SA
// SBlockData ======================================================
int32_t tBlockDataCreate(SBlockData *pBlockData) {
int32_t code = 0;
pBlockData->suid = 0;
pBlockData->uid = 0;
pBlockData->nRow = 0;
......@@ -1051,7 +1049,7 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) {
pBlockData->aTSKEY = NULL;
pBlockData->nColData = 0;
pBlockData->aColData = NULL;
return code;
return 0;
}
void tBlockDataDestroy(SBlockData *pBlockData) {
......@@ -1107,8 +1105,8 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema,
int32_t iColumn = 1;
STColumn *pTColumn = &pTSchema->columns[iColumn];
for (int32_t iCid = 0; iCid < nCid; iCid++) {
// aCid array (from taos client catalog) contains columns that does not exist in the pTSchema. the pTSchema is newer
// aCid array (from taos client catalog) contains columns that does not exist in the pTSchema. the pTSchema is
// newer
if (pTColumn == NULL) {
continue;
}
......@@ -1239,7 +1237,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
_exit:
return code;
}
static int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
// version
......
......@@ -203,6 +203,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
}
void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
......
......@@ -62,7 +62,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
}
// query meta
metaReaderInit(&mer1, pVnode->pMeta, 0);
metaReaderDoInit(&mer1, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
code = terrno;
......@@ -79,7 +79,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
schemaTag = mer1.me.stbEntry.schemaTag;
metaRsp.suid = mer1.me.uid;
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
metaReaderInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
strcpy(metaRsp.stbName, mer2.me.name);
......@@ -175,7 +175,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
}
// query meta
metaReaderInit(&mer1, pVnode->pMeta, 0);
metaReaderDoInit(&mer1, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) {
code = terrno;
......@@ -188,7 +188,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
code = TSDB_CODE_VND_HASH_MISMATCH;
goto _exit;
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
metaReaderInit(&mer2, pVnode->pMeta, 0);
metaReaderDoInit(&mer2, pVnode->pMeta, 0);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
strcpy(cfgRsp.stbName, mer2.me.name);
......
......@@ -402,10 +402,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) {
if (tqCheckLogInWal(pVnode->pTq, ver)) return 0;
}
// skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead);
......@@ -501,16 +497,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err;
}
} break;
case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_CHECK_RSP: {
if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
goto _err;
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
......@@ -641,26 +627,49 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
}
}
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECK_RSP:
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg);
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH:
case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_STREAM_TRANSFER_STATE: {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len);
}
case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH_RSP:
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
}
}
......@@ -1695,7 +1704,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
tDecodeSBatchDeleteReq(&decoder, &deleteReq);
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
for (int32_t i = 0; i < sz; i++) {
......
......@@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId);
} else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq);
tqCheckStreamStatus(pVnode->pTq);
}
}
......
......@@ -392,7 +392,7 @@ static int32_t setAliveResultIntoDataBlock(int64_t* pConnId, SSDataBlock* pBlock
int32_t status = 0;
int32_t code = getAliveStatusFromApi(pConnId, dbName, &status);
if (code == TSDB_CODE_SUCCESS) {
colDataAppend(pCol1, 0, (const char*)&status, false);
colDataSetVal(pCol1, 0, (const char*)&status, false);
}
return code;
}
......
......@@ -285,6 +285,8 @@ typedef struct SStreamAggSupporter {
int16_t stateKeyType;
SDiskbasedBuf* pResultBuf;
SStateStore stateStore;
STimeWindow winRange;
SStorageAPI* pSessionAPI;
} SStreamAggSupporter;
typedef struct SWindowSupporter {
......@@ -503,6 +505,8 @@ typedef struct SStreamSessionAggOperatorInfo {
SArray* pUpdated;
SSHashObj* pStUpdated;
int64_t dataVersion;
SArray* historyWins;
bool isHistoryOp;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
......@@ -522,6 +526,8 @@ typedef struct SStreamStateAggOperatorInfo {
SArray* pUpdated;
SSHashObj* pSeUpdated;
int64_t dataVersion;
bool isHistoryOp;
SArray* historyWins;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
......@@ -678,6 +684,8 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
uint64_t calcGroupId(char* pData, int32_t len);
void streamOpReleaseState(struct SOperatorInfo* pOperator);
void streamOpReloadState(struct SOperatorInfo* pOperator);
#ifdef __cplusplus
}
......
......@@ -35,6 +35,7 @@ typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param);
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_state_fn_t)(struct SOperatorInfo* pOptr);
typedef struct SOperatorFpSet {
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
......@@ -45,6 +46,8 @@ typedef struct SOperatorFpSet {
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
__optr_state_fn_t releaseStreamStateFn;
__optr_state_fn_t reloadStreamStateFn;
} SOperatorFpSet;
enum {
......@@ -126,13 +129,13 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle);
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
......@@ -143,6 +146,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn);
int32_t optrDummyOpenFn(SOperatorInfo* pOperator);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
void setOperatorCompleted(SOperatorInfo* pOperator);
......
......@@ -62,10 +62,12 @@ typedef struct {
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep;
bool recoverStep1Finished;
bool recoverStep2Finished;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow;
SStreamState* pState;
int64_t dataVersion;
int64_t checkPointId;
......
......@@ -92,6 +92,7 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
qError("join not supported for stream block scan, %s" PRIx64, id);
return TSDB_CODE_APP_ERROR;
}
pOperator->status = OP_NOT_OPENED;
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
}
......@@ -115,6 +116,16 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
clearStreamBlock(pTaskInfo->pRoot);
}
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
if (pTaskInfo == NULL) {
return;
}
qDebug("%s set fill history start key:%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN);
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
......@@ -130,10 +141,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else {
pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("s-task:%s in this batch, all %d blocks need to be processed and dump results", id, (int32_t)numOfBlocks);
qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks);
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
if (type == STREAM_INPUT__MERGED_SUBMIT) {
......@@ -265,6 +275,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
if (NULL == pTaskInfo->pRoot) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -314,8 +325,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
return NULL;
}
struct SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
......@@ -869,19 +880,41 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.fillHistoryVer1 = ver;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
pStreamInfo->recoverStep1Finished = false;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 1. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
}
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.fillHistoryVer2 = ver;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
}
......@@ -892,55 +925,58 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
return 0;
}
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
qInfo("save stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
qInfo("save stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
qInfo("save stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
qInfo("save stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
}
......@@ -961,7 +997,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
return 0;
}
int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
......@@ -1009,6 +1045,26 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
return pTaskInfo->streamInfo.recoverScanFinished;
}
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverStep1Finished;
}
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverStep2Finished;
}
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->streamInfo.recoverStep1Finished = true;
pTaskInfo->streamInfo.recoverStep2Finished = true;
// reset the time window
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
return 0;
}
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
......@@ -1323,4 +1379,16 @@ SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
SOperatorInfo* pOperator = pTaskInfo->pRoot;
extractTableList(pArray, pOperator);
return pArray;
}
\ No newline at end of file
}
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
return 0;
}
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
return 0;
}
......@@ -1562,6 +1562,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -1036,7 +1036,7 @@ void appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTagSup
}
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
colDataSetVal(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
pDestBlock->info.rows++;
blockDataDestroy(pTmpBlock);
} else {
......@@ -1324,6 +1324,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL,
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1);
......
......@@ -38,11 +38,18 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
.closeFn = closeFn,
.reqBufFn = reqBufFn,
.getExplainFn = explain,
.releaseStreamStateFn = NULL,
.reloadStreamStateFn = NULL,
};
return fpSet;
}
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn) {
pOperator->fpSet.releaseStreamStateFn = relaseFn;
pOperator->fpSet.reloadStreamStateFn = reloadFn;
}
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = 0;
......@@ -485,13 +492,13 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
int32_t children = 0;
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
int32_t children = pHandle->numOfVgroups;
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
......@@ -500,7 +507,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
......
......@@ -73,6 +73,20 @@ static void destroyIndefinitOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
}
}
void streamOperatorReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -136,6 +150,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -272,7 +272,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
continue;
} else if (isIsfilledPseudoColumn(pExprInfo)) {
bool isFilled = true;
colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false);
colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
continue;
} else if (!isInterpFunc(pExprInfo)) {
if (isGroupKeyFunc(pExprInfo)) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册