From c2955a807b6c60bd6a6c105ca0477dcddd952971 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 29 Jun 2022 20:57:15 +0800 Subject: [PATCH] feat(tmq): offset support snapshot --- examples/c/tmq.c | 4 +- include/common/tcommon.h | 17 +- include/common/tmsg.h | 70 ++- include/libs/executor/executor.h | 7 - include/libs/stream/tstream.h | 19 +- source/client/inc/clientInt.h | 25 +- source/client/src/clientMain.c | 10 +- source/client/src/tmq.c | 406 ++++++++---------- source/common/src/tmsg.c | 166 +++++-- source/dnode/vnode/inc/vnode.h | 20 +- source/dnode/vnode/src/inc/tq.h | 19 +- source/dnode/vnode/src/sma/smaRollup.c | 14 +- source/dnode/vnode/src/tq/tq.c | 299 +++++++++++-- source/dnode/vnode/src/tq/tqExec.c | 35 +- source/dnode/vnode/src/tq/tqOffset.c | 4 +- source/dnode/vnode/src/tq/tqPush.c | 12 +- source/dnode/vnode/src/tq/tqRead.c | 29 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/executor/src/executor.c | 10 +- source/libs/executor/src/scanoperator.c | 12 +- source/libs/executor/src/timewindowoperator.c | 26 +- source/libs/function/src/builtins.c | 25 +- source/libs/stream/src/streamExec.c | 19 +- source/libs/stream/src/streamTask.c | 8 +- 24 files changed, 805 insertions(+), 453 deletions(-) diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 68b0492ace..4226587d56 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -138,7 +138,7 @@ int32_t create_topic() { taos_free_result(pRes); /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1 where t1 = 2000"); + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -225,7 +225,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } int32_t cnt = 0; while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, -1); if (tmqmessage) { cnt++; msg_process(tmqmessage); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 928fe0aa0e..54044f7d89 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -25,10 +25,11 @@ extern "C" { #endif +// TODO remove it enum { - TMQ_CONF__RESET_OFFSET__LATEST = -1, - TMQ_CONF__RESET_OFFSET__EARLIEAST = -2, TMQ_CONF__RESET_OFFSET__NONE = -3, + TMQ_CONF__RESET_OFFSET__EARLIEAST = -2, + TMQ_CONF__RESET_OFFSET__LATEST = -1, }; enum { @@ -39,6 +40,16 @@ enum { TMQ_MSG_TYPE__END_RSP, }; +enum { + STREAM_INPUT__DATA_SUBMIT = 1, + STREAM_INPUT__DATA_BLOCK, + STREAM_INPUT__DATA_SCAN, + STREAM_INPUT__DATA_RETRIEVE, + STREAM_INPUT__TRIGGER, + STREAM_INPUT__CHECKPOINT, + STREAM_INPUT__DROP, +}; + typedef enum EStreamType { STREAM_NORMAL = 1, STREAM_INVERT, @@ -48,7 +59,7 @@ typedef enum EStreamType { STREAM_DELETE, STREAM_RETRIEVE, STREAM_PUSH_DATA, - STREAM_PUSH_EMPTY, + STREAM_PUSH_OVER, } EStreamType; typedef struct { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0afcaaa50d..71867d9741 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2462,22 +2462,37 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pRe // tqOffset enum { - TMQ_OFFSET__SNAPSHOT = 1, - TMQ_OFFSET__LOG, + TMQ_OFFSET__RESET_NONE = -3, + TMQ_OFFSET__RESET_EARLIEAST = -2, + TMQ_OFFSET__RESET_LATEST = -1, + TMQ_OFFSET__LOG = 1, + TMQ_OFFSET__SNAPSHOT_DATA = 2, + TMQ_OFFSET__SNAPSHOT_META = 3, }; typedef struct { int8_t type; union { + // snapshot data struct { int64_t uid; int64_t ts; }; + // log struct { int64_t version; }; }; - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; +} STqOffsetVal; + +int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal); +int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal); +int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal); +bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight); + +typedef struct { + STqOffsetVal val; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; } STqOffset; int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset); @@ -2710,7 +2725,8 @@ typedef struct { uint64_t reqId; int64_t consumerId; int64_t timeout; - int64_t currentOffset; + // int64_t currentOffset; + STqOffsetVal reqOffset; } SMqPollReq; typedef struct { @@ -2779,12 +2795,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { } typedef struct { - SMqRspHead head; - int64_t reqOffset; - int64_t rspOffset; - int16_t resMsgType; - int32_t metaRspLen; - void* metaRsp; + SMqRspHead head; + int64_t reqOffset; + int64_t rspOffset; + STqOffsetVal reqOffsetNew; + STqOffsetVal rspOffsetNew; + int16_t resMsgType; + int32_t metaRspLen; + void* metaRsp; } SMqMetaRsp; static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) { @@ -2806,6 +2824,24 @@ static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) { return (void*)buf; } +typedef struct { + SMqRspHead head; + STqOffsetVal reqOffset; + STqOffsetVal rspOffset; + int32_t skipLogNum; + int32_t blockNum; + int8_t withTbName; + int8_t withSchema; + SArray* blockDataLen; + SArray* blockData; + SArray* blockTbName; + SArray* blockSchema; +} SMqDataRsp; + +int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); +int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); + +#if 0 typedef struct { SMqRspHead head; int64_t reqOffset; @@ -2814,13 +2850,10 @@ typedef struct { int32_t blockNum; int8_t withTbName; int8_t withSchema; - int8_t withTag; - SArray* blockDataLen; // SArray - SArray* blockData; // SArray - SArray* blockTbName; // SArray - SArray* blockSchema; // SArray - SArray* blockTags; // SArray - SArray* blockTagSchema; // SArray + SArray* blockDataLen; // SArray + SArray* blockData; // SArray + SArray* blockTbName; // SArray + SArray* blockSchema; // SArray } SMqDataBlkRsp; static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) { @@ -2832,7 +2865,6 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp if (pRsp->blockNum != 0) { tlen += taosEncodeFixedI8(buf, pRsp->withTbName); tlen += taosEncodeFixedI8(buf, pRsp->withSchema); - tlen += taosEncodeFixedI8(buf, pRsp->withTag); for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i); @@ -2862,7 +2894,6 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t)); buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withSchema); - buf = taosDecodeFixedI8(buf, &pRsp->withTag); if (pRsp->withTbName) { pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); } @@ -2891,6 +2922,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p } return (void*)buf; } +#endif typedef struct { SMqRspHead head; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 00acc4741d..564c208c69 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -36,15 +36,8 @@ typedef struct SReadHandle { void* vnode; void* mnd; SMsgCb* pMsgCb; -// int8_t initTsdbReader; } SReadHandle; -enum { - STREAM_DATA_TYPE_SUBMIT_BLOCK = 1, - STREAM_DATA_TYPE_SSDATA_BLOCK = 2, - STREAM_DATA_TYPE_FROM_SNAPSHOT = 3, -}; - typedef enum { OPTR_EXEC_MODEL_BATCH = 0x1, OPTR_EXEC_MODEL_STREAM = 0x2, diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 74c885ebcc..67074c789e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -55,15 +55,6 @@ enum { TASK_OUTPUT_STATUS__BLOCKED, }; -enum { - STREAM_INPUT__DATA_SUBMIT = 1, - STREAM_INPUT__DATA_BLOCK, - STREAM_INPUT__DATA_RETRIEVE, - STREAM_INPUT__TRIGGER, - STREAM_INPUT__CHECKPOINT, - STREAM_INPUT__DROP, -}; - typedef struct { int8_t type; } SStreamQueueItem; @@ -152,10 +143,6 @@ typedef struct { void* executor; } STaskExec; -typedef struct { - int32_t taskId; -} STaskDispatcherInplace; - typedef struct { int32_t taskId; int32_t nodeId; @@ -208,7 +195,6 @@ enum { enum { TASK_DISPATCH__NONE = 1, - TASK_DISPATCH__INPLACE, TASK_DISPATCH__FIXED, TASK_DISPATCH__SHUFFLE, }; @@ -260,7 +246,7 @@ struct SStreamTask { // exec STaskExec exec; - // TODO: merge sink and dispatch + // TODO: unify sink and dispatch // local sink union { @@ -269,9 +255,8 @@ struct SStreamTask { STaskSinkFetch fetchSink; }; - // dispatch + // remote dispatcher union { - STaskDispatcherInplace inplaceDispatcher; STaskDispatcherFixedEp fixedEpDispatcher; STaskDispatcherShuffle shuffleDispatcher; }; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index cca79186d0..5ffc9ecae8 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -54,11 +54,10 @@ enum { RES_TYPE__TMQ_META, }; -#define SHOW_VARIABLES_RESULT_COLS 2 +#define SHOW_VARIABLES_RESULT_COLS 2 #define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) - #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) @@ -194,7 +193,7 @@ typedef struct { int32_t vgId; SSchemaWrapper schema; int32_t resIter; - SMqDataBlkRsp rsp; + SMqDataRsp rsp; SReqResultInfo resInfo; } SMqRspObj; @@ -238,18 +237,18 @@ typedef struct SSyncQueryParam { void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); -void doSetOneRowPtr(SReqResultInfo* pResultInfo); -void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); -int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, - bool freeAfterUse); -void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); -void doFreeReqResultInfo(SReqResultInfo* pResInfo); -int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq); -void syncCatalogFn(SMetaData* pResult, void* param, int32_t code); +void doSetOneRowPtr(SReqResultInfo* pResultInfo); +void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, + bool freeAfterUse); +void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); +void doFreeReqResultInfo(SReqResultInfo* pResInfo); +int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq); +void syncCatalogFn(SMetaData* pResult, void* param, int32_t code); SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly); -TAOS_RES *taosQueryImpl(TAOS *taos, const char *sql, bool validateOnly); -void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, bool validateOnly); +TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly); +void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly); static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { SMqRspObj* msg = (SMqRspObj*)res; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 52574dcc9f..90df3eb31a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -81,16 +81,16 @@ void taos_cleanup(void) { taosCloseLog(); } -static setConfRet taos_set_config_imp(const char *config){ +static setConfRet taos_set_config_imp(const char *config) { setConfRet ret = {SET_CONF_RET_SUCC, {0}}; // TODO: need re-implementation return ret; } -setConfRet taos_set_config(const char *config){ -// TODO pthread_mutex_lock(&setConfMutex); +setConfRet taos_set_config(const char *config) { + // TODO pthread_mutex_lock(&setConfMutex); setConfRet ret = taos_set_config_imp(config); -// pthread_mutex_unlock(&setConfMutex); + // pthread_mutex_unlock(&setConfMutex); return ret; } @@ -179,8 +179,6 @@ void taos_free_result(TAOS_RES *res) { SMqRspObj *pRsp = (SMqRspObj *)res; if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); - if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags); - if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema); if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); pRsp->resInfo.pRspMsg = NULL; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 637a7ee5dd..4cef25b9ce 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -126,8 +126,10 @@ typedef struct { // statistics int64_t pollCnt; // offset - int64_t committedOffset; - int64_t currentOffset; + /*int64_t committedOffset;*/ + /*int64_t currentOffset;*/ + STqOffsetVal committedOffsetNew; + STqOffsetVal currentOffsetNew; // connection info int32_t vgId; int32_t vgStatus; @@ -152,8 +154,8 @@ typedef struct { SMqClientVg* vgHandle; SMqClientTopic* topicHandle; union { - SMqDataBlkRsp dataRsp; - SMqMetaRsp metaRsp; + SMqDataRsp dataRsp; + SMqMetaRsp metaRsp; }; } SMqPollRspWrapper; @@ -179,6 +181,7 @@ typedef struct { tsem_t rspSem; } SMqPollCbParam; +#if 0 typedef struct { tmq_t* tmq; int8_t async; @@ -190,12 +193,13 @@ typedef struct { SArray* offsets; void* userParam; } SMqCommitCbParam; +#endif typedef struct { - tmq_t* tmq; - int8_t automatic; - int8_t async; - int8_t freeOffsets; + tmq_t* tmq; + int8_t automatic; + int8_t async; + /*int8_t freeOffsets;*/ int32_t waitingRspNum; int32_t totalRspNum; int32_t rspErr; @@ -351,6 +355,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } +#if 0 int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; pParam->rspErr = code; @@ -371,6 +376,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { } return 0; } +#endif int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; @@ -413,141 +419,150 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { return 0; } -int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, - void* userParam) { - int32_t code = -1; +static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) { + STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); + if (pOffset == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pOffset->val = pVg->currentOffsetNew; - if (msg != NULL) { - char* topic; - int32_t vgId; - if (TD_RES_TMQ(msg)) { - SMqRspObj* pRspObj = (SMqRspObj*)msg; - topic = pRspObj->topic; - vgId = pRspObj->vgId; - } else if (TD_RES_TMQ_META(msg)) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg; - topic = pMetaRspObj->topic; - vgId = pMetaRspObj->vgId; - } else { - return TSDB_CODE_TMQ_INVALID_MSG; - } + int32_t groupLen = strlen(tmq->groupId); + memcpy(pOffset->subKey, tmq->groupId, groupLen); + pOffset->subKey[groupLen] = TMQ_SEPARATOR; + strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName); - SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); - if (pParamSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pParamSet->tmq = tmq; - pParamSet->automatic = automatic; - pParamSet->async = async; - pParamSet->freeOffsets = 1; - pParamSet->userCb = userCb; - pParamSet->userParam = userParam; - tsem_init(&pParamSet->rspSem, 0, 0); + int32_t len; + int32_t code; + tEncodeSize(tEncodeSTqOffset, pOffset, len, code); + if (code < 0) { + ASSERT(0); + return -1; + } + void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); + if (buf == NULL) return -1; + ((SMsgHead*)buf)->vgId = htonl(pVg->vgId); - for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (strcmp(pTopic->topicName, topic) == 0) { - for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->vgId == vgId) { - if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) { - tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld", - tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset); - - return 0; - } - - STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); - if (pOffset == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pOffset->type = TMQ_OFFSET__LOG; - pOffset->version = pVg->currentOffset; - - int32_t groupLen = strlen(tmq->groupId); - memcpy(pOffset->subKey, tmq->groupId, groupLen); - pOffset->subKey[groupLen] = TMQ_SEPARATOR; - strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName); - - int32_t len; - int32_t code; - tEncodeSize(tEncodeSTqOffset, pOffset, len, code); - if (code < 0) { - ASSERT(0); - } - void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); - ((SMsgHead*)buf)->vgId = htonl(pVg->vgId); - - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, len); - tEncodeSTqOffset(&encoder, pOffset); - - // build param - SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2)); - pParam->params = pParamSet; - pParam->pOffset = pOffset; - - // build send info - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (pMsgSendInfo == NULL) { - // TODO - continue; - } - pMsgSendInfo->msgInfo = (SDataBuf){ - .pData = buf, - .len = sizeof(SMsgHead) + len, - .handle = NULL, - }; - - tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, - pVg->vgId, pOffset->version); - - // TODO: put into cb - pVg->committedOffset = pVg->currentOffset; - - pMsgSendInfo->requestId = generateRequestId(); - pMsgSendInfo->requestObjRefId = 0; - pMsgSendInfo->param = pParam; - pMsgSendInfo->fp = tmqCommitCb2; - pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; - // send msg - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); - pParamSet->waitingRspNum++; - pParamSet->totalRspNum++; - } + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + tEncodeSTqOffset(&encoder, pOffset); + + // build param + SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2)); + pParam->params = pParamSet; + pParam->pOffset = pOffset; + + // build send info + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (pMsgSendInfo == NULL) { + return -1; + } + pMsgSendInfo->msgInfo = (SDataBuf){ + .pData = buf, + .len = sizeof(SMsgHead) + len, + .handle = NULL, + }; + + tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId, + pOffset->val.version); + + // TODO: put into cb + pVg->committedOffsetNew = pVg->currentOffsetNew; + + pMsgSendInfo->requestId = generateRequestId(); + pMsgSendInfo->requestObjRefId = 0; + pMsgSendInfo->param = pParam; + pMsgSendInfo->fp = tmqCommitCb2; + pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; + // send msg + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); + pParamSet->waitingRspNum++; + pParamSet->totalRspNum++; + return 0; +} + +int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) { + char* topic; + int32_t vgId; + ASSERT(msg != NULL); + if (TD_RES_TMQ(msg)) { + SMqRspObj* pRspObj = (SMqRspObj*)msg; + topic = pRspObj->topic; + vgId = pRspObj->vgId; + } else if (TD_RES_TMQ_META(msg)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg; + topic = pMetaRspObj->topic; + vgId = pMetaRspObj->vgId; + } else { + return TSDB_CODE_TMQ_INVALID_MSG; + } + + SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); + if (pParamSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pParamSet->tmq = tmq; + pParamSet->automatic = 0; + pParamSet->async = async; + /*pParamSet->freeOffsets = 1;*/ + pParamSet->userCb = userCb; + pParamSet->userParam = userParam; + tsem_init(&pParamSet->rspSem, 0, 0); + + for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + if (strcmp(pTopic->topicName, topic) != 0) continue; + for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + if (pVg->vgId != vgId) continue; + + if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) { + if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { + goto FAIL; } + goto HANDLE_RSP; } } - if (pParamSet->totalRspNum == 0) { - tsem_destroy(&pParamSet->rspSem); - taosMemoryFree(pParamSet); - return 0; - } + } - if (!async) { - tsem_wait(&pParamSet->rspSem); - code = pParamSet->rspErr; - tsem_destroy(&pParamSet->rspSem); - } else { - code = 0; - } + int32_t code = -1; - if (code != 0 && async) { - if (automatic) { - tmq->commitCb(tmq, code, tmq->commitCbUserParam); - } else { - userCb(tmq, code, userParam); - } - } +HANDLE_RSP: + if (pParamSet->totalRspNum == 0) { + tsem_destroy(&pParamSet->rspSem); + taosMemoryFree(pParamSet); return 0; } + if (!async) { + tsem_wait(&pParamSet->rspSem); + code = pParamSet->rspErr; + tsem_destroy(&pParamSet->rspSem); + return code; + } else { + code = 0; + } + +FAIL: + if (code != 0 && async) { + userCb(tmq, code, userParam); + } + return 0; +} + +int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, + void* userParam) { + int32_t code = -1; + + if (msg != NULL) { + return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam); + } + SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -556,7 +571,7 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ pParamSet->tmq = tmq; pParamSet->automatic = automatic; pParamSet->async = async; - pParamSet->freeOffsets = 1; + /*pParamSet->freeOffsets = 1;*/ pParamSet->userCb = userCb; pParamSet->userParam = userParam; tsem_init(&pParamSet->rspSem, 0, 0); @@ -572,75 +587,11 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ tscDebug("consumer %ld begin commit for topic %s, vgId %d", tmq->consumerId, pTopic->topicName, pVg->vgId); - /*if (pVg->currentOffset < 0) {*/ - if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) { - tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld", - tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset); - - continue; - } - STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); - if (pOffset == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pOffset->type = TMQ_OFFSET__LOG; - pOffset->version = pVg->currentOffset; - - int32_t groupLen = strlen(tmq->groupId); - memcpy(pOffset->subKey, tmq->groupId, groupLen); - pOffset->subKey[groupLen] = TMQ_SEPARATOR; - strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName); - - int32_t len; - int32_t code; - tEncodeSize(tEncodeSTqOffset, pOffset, len, code); - if (code < 0) { - ASSERT(0); - } - void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); - ((SMsgHead*)buf)->vgId = htonl(pVg->vgId); - - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, len); - tEncodeSTqOffset(&encoder, pOffset); - - // build param - SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2)); - pParam->params = pParamSet; - pParam->pOffset = pOffset; - - // build send info - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (pMsgSendInfo == NULL) { - // TODO - continue; + if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) { + if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { + continue; + } } - pMsgSendInfo->msgInfo = (SDataBuf){ - .pData = buf, - .len = sizeof(SMsgHead) + len, - .handle = NULL, - }; - - tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId, - pOffset->version); - - // TODO: put into cb - pVg->committedOffset = pVg->currentOffset; - - pMsgSendInfo->requestId = generateRequestId(); - pMsgSendInfo->requestObjRefId = 0; - pMsgSendInfo->param = pParam; - pMsgSendInfo->fp = tmqCommitCb2; - pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; - // send msg - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); - pParamSet->waitingRspNum++; - pParamSet->totalRspNum++; } } @@ -1174,7 +1125,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (rspType == TMQ_MSG_TYPE__POLL_RSP) { memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp); + SDecoder decoder; + tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); + tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp); + /*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/ } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1226,9 +1180,11 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); - tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %ld", tmq->consumerId, epoch, pVgCur->vgId, vgKey, - pVgCur->currentOffset); - taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); + char buf[50]; + tFormatOffset(buf, 50, &pVgCur->currentOffsetNew); + tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, + buf); + taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal)); } } } @@ -1257,7 +1213,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { pVgEp->vgId, offset, vgKey); SMqClientVg clientVg = { .pollCnt = 0, - .currentOffset = offset, + .currentOffsetNew.type = tmq->resetOffsetCfg, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, .vgStatus = TMQ_VG_STATUS__IDLE, @@ -1281,7 +1237,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { return set; } -#if 1 +#if 0 bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { /*printf("call update ep %d\n", epoch);*/ bool set = false; @@ -1516,16 +1472,16 @@ int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { #endif SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { - int64_t reqOffset; - if (pVg->currentOffset >= 0) { - reqOffset = pVg->currentOffset; - } else { - /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/ - /*tscError("unable to poll since no committed offset but reset offset is set to none");*/ - /*return NULL;*/ - /*}*/ - reqOffset = tmq->resetOffsetCfg; - } + /*int64_t reqOffset;*/ + /*if (pVg->currentOffset >= 0) {*/ + /*reqOffset = pVg->currentOffset;*/ + /*} else {*/ + /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/ + /*tscError("unable to poll since no committed offset but reset offset is set to none");*/ + /*return NULL;*/ + /*}*/ + /*reqOffset = tmq->resetOffsetCfg;*/ + /*}*/ SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq)); if (pReq == NULL) { @@ -1544,7 +1500,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pReq->timeout = timeout; pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; - pReq->currentOffset = reqOffset; + /*pReq->currentOffset = reqOffset;*/ + pReq->reqOffset = pVg->currentOffsetNew; pReq->reqId = generateRequestId(); pReq->useSnapshot = tmq->useSnapshot; @@ -1572,7 +1529,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; - memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataBlkRsp)); + memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; @@ -1645,8 +1602,11 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int64_t transporterId = 0; /*printf("send poll\n");*/ - tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, - pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); + + char offsetFormatBuf[50]; + tFormatOffset(offsetFormatBuf, 50, &pVg->currentOffsetNew); + tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId, + pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; @@ -1695,7 +1655,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ - pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; + pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->dataRsp.blockNum == 0) { taosFreeQitem(pollRspWrapper); @@ -1717,7 +1677,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ - pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffsetNew; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e9b5c67d76..8c1daae3fa 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2300,7 +2300,6 @@ int32_t tDeserializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp return 0; } - int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -2387,7 +2386,6 @@ int32_t tDeserializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); } - int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -2909,20 +2907,19 @@ int32_t tDeserializeSShowVariablesReq(void *buf, int32_t bufLen, SShowVariablesR return 0; } -int32_t tEncodeSVariablesInfo(SEncoder* pEncoder, SVariablesInfo* pInfo) { +int32_t tEncodeSVariablesInfo(SEncoder *pEncoder, SVariablesInfo *pInfo) { if (tEncodeCStr(pEncoder, pInfo->name) < 0) return -1; if (tEncodeCStr(pEncoder, pInfo->value) < 0) return -1; return 0; } -int32_t tDecodeSVariablesInfo(SDecoder* pDecoder, SVariablesInfo* pInfo) { +int32_t tDecodeSVariablesInfo(SDecoder *pDecoder, SVariablesInfo *pInfo) { if (tDecodeCStrTo(pDecoder, pInfo->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pInfo->value) < 0) return -1; return 0; } - -int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) { +int32_t tSerializeSShowVariablesRsp(void *buf, int32_t bufLen, SShowVariablesRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -2930,7 +2927,7 @@ int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp int32_t varNum = taosArrayGetSize(pRsp->variables); if (tEncodeI32(&encoder, varNum) < 0) return -1; for (int32_t i = 0; i < varNum; ++i) { - SVariablesInfo* pInfo = taosArrayGet(pRsp->variables, i); + SVariablesInfo *pInfo = taosArrayGet(pRsp->variables, i); if (tEncodeSVariablesInfo(&encoder, pInfo) < 0) return -1; } tEndEncode(&encoder); @@ -2940,7 +2937,7 @@ int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp return tlen; } -int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) { +int32_t tDeserializeSShowVariablesRsp(void *buf, int32_t bufLen, SShowVariablesRsp *pRsp) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -2962,11 +2959,11 @@ int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesR return 0; } -void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp) { +void tFreeSShowVariablesRsp(SShowVariablesRsp *pRsp) { if (NULL == pRsp) { return; } - + taosArrayDestroy(pRsp->variables); } @@ -5360,30 +5357,149 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) { } } -int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) { - if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; - if (pOffset->type == TMQ_OFFSET__SNAPSHOT) { - if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; - if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1; - } else if (pOffset->type == TMQ_OFFSET__LOG) { - if (tEncodeI64(pEncoder, pOffset->version) < 0) return -1; +int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) { + if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1; + if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (tEncodeI64(pEncoder, pOffsetVal->uid) < 0) return -1; + if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1; + } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { + if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1; + } else if (pOffsetVal->type < 0) { + // do nothing } else { ASSERT(0); } - if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1; return 0; } -int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { - if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1; - if (pOffset->type == TMQ_OFFSET__SNAPSHOT) { - if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1; - if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1; - } else if (pOffset->type == TMQ_OFFSET__LOG) { - if (tDecodeI64(pDecoder, &pOffset->version) < 0) return -1; +int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { + if (tDecodeI8(pDecoder, &pOffsetVal->type) < 0) return -1; + if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1; + if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; + } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { + if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1; + } else if (pOffsetVal->type < 0) { + // do nothing + } else { + ASSERT(0); + } + return 0; +} + +#if 1 +int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { + if (pVal->type == TMQ_OFFSET__RESET_NONE) { + snprintf(buf, maxLen, "offset(reset to none)"); + } else if (pVal->type == TMQ_OFFSET__RESET_EARLIEAST) { + snprintf(buf, maxLen, "offset(reset to earlieast)"); + } else if (pVal->type == TMQ_OFFSET__RESET_LATEST) { + snprintf(buf, maxLen, "offset(reset to latest)"); + } else if (pVal->type == TMQ_OFFSET__LOG) { + snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version); + } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { + snprintf(buf, maxLen, "offset(snapshot data) uid:%ld, ts:%ld", pVal->uid, pVal->ts); + } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) { + snprintf(buf, maxLen, "offset(snapshot meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts); } else { ASSERT(0); } + return 0; +} +#endif + +bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { + if (pLeft->type == pRight->type) { + if (pLeft->type == TMQ_OFFSET__LOG) { + return pLeft->version == pRight->version; + } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) { + return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; + } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { + ASSERT(0); + // TODO + return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; + } + } + return false; +} + +int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) { + if (tEncodeSTqOffsetVal(pEncoder, &pOffset->val) < 0) return -1; + if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1; + return 0; +} + +int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { + if (tDecodeSTqOffsetVal(pDecoder, &pOffset->val) < 0) return -1; if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1; return 0; } + +int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { + if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; + if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->skipLogNum) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1; + if (pRsp->blockNum != 0) { + if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->withSchema) < 0) return -1; + + for (int32_t i = 0; i < pRsp->blockNum; i++) { + int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i); + void *data = taosArrayGetP(pRsp->blockData, i); + if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1; + if (pRsp->withSchema) { + SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); + if (tEncodeSSchemaWrapper(pEncoder, pSW) < 0) return -1; + } + if (pRsp->withTbName) { + char *tbName = (char *)taosArrayGetP(pRsp->blockTbName, i); + if (tEncodeCStr(pEncoder, tbName) < 0) return -1; + } + } + } + return 0; +} + +int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { + if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1; + if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->skipLogNum) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1; + if (pRsp->blockNum != 0) { + pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *)); + pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t)); + if (tDecodeI8(pDecoder, &pRsp->withTbName) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->withSchema) < 0) return -1; + if (pRsp->withTbName) { + pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void *)); + } + if (pRsp->withSchema) { + pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *)); + } + + for (int32_t i = 0; i < pRsp->blockNum; i++) { + void *data; + uint64_t bLen; + if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1; + taosArrayPush(pRsp->blockData, &data); + int32_t len = bLen; + taosArrayPush(pRsp->blockDataLen, &len); + + if (pRsp->withSchema) { + SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pSW == NULL) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) return -1; + taosArrayPush(pRsp->blockSchema, &pSW); + } + + if (pRsp->withTbName) { + char *tbName; + if (tDecodeCStrAlloc(pDecoder, &tbName) < 0) return -1; + taosArrayPush(pRsp->blockTbName, &tbName); + } + } + } + return 0; +} + diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 5c2d2cd712..cef0bbbc01 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -139,19 +139,19 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle); // tq -typedef struct STqReadHandle STqReadHandle; +typedef struct STqReadHandle SStreamReader; -STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); +SStreamReader *tqInitSubmitMsgScanner(SMeta *pMeta); -void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); -int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); -int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); -int32_t tqReadHandleRemoveTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +void tqReadHandleSetColIdList(SStreamReader *pReadHandle, SArray *pColIdList); +int32_t tqReadHandleSetTbUidList(SStreamReader *pHandle, const SArray *tbUidList); +int32_t tqReadHandleAddTbUidList(SStreamReader *pHandle, const SArray *tbUidList); +int32_t tqReadHandleRemoveTbUidList(SStreamReader *pHandle, const SArray *tbUidList); -int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); -bool tqNextDataBlock(STqReadHandle *pHandle); -bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReadHandle *pHandle); +int32_t tqReadHandleSetMsg(SStreamReader *pHandle, SSubmitReq *pMsg, int64_t ver); +bool tqNextDataBlock(SStreamReader *pHandle); +bool tqNextDataBlockFilterOut(SStreamReader *pHandle, SHashObj *filterOutUids); +int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, SStreamReader *pHandle); // sma int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 59c3e95b9c..38a4f28041 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -39,6 +39,16 @@ extern "C" { #define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) + +#define IS_META_MSG(x) ( \ + x == TDMT_VND_CREATE_STB \ + || x == TDMT_VND_ALTER_STB \ + || x == TDMT_VND_DROP_STB \ + || x == TDMT_VND_CREATE_TABLE \ + || x == TDMT_VND_ALTER_TABLE \ + || x == TDMT_VND_DROP_TABLE \ + || x == TDMT_VND_DROP_TTL_TABLE \ +) // clang-format on typedef struct STqOffsetStore STqOffsetStore; @@ -101,12 +111,13 @@ typedef struct { typedef struct { int8_t subType; - STqReadHandle* pExecReader[5]; + SStreamReader* pExecReader[5]; union { STqExecCol execCol; STqExecTb execTb; STqExecDb execDb; }; + } STqExecHandle; typedef struct { @@ -149,9 +160,9 @@ static STqMgmt tqMgmt = {0}; int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** pHeadWithCkSum); // tqExec -int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId); -int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId); -int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp); +int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId); +int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, int32_t workerId); +int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ed5b6f4055..e1e195f14c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -306,7 +306,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con return TSDB_CODE_FAILED; } - STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta); + SStreamReader *pReadHandle = tqInitSubmitMsgScanner(pMeta); if (!pReadHandle) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -590,8 +590,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { tdRefSmaStat(pSma, (SSmaStat *)pStat); SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_DATA_TYPE_SSDATA_BLOCK, false); - tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK); + qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false); + tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_BLOCK); tdUnRefSmaStat(pSma, (SSmaStat *)pStat); @@ -611,12 +611,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, pItem->taskInfo, suid); - if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // STREAM_DATA_TYPE_SUBMIT_BLOCK + if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // INPUT__DATA_SUBMIT smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } - tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK); + tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_SUBMIT); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); smaDebug("vgId:%d, process rsma insert", SMA_VID(pSma)); @@ -654,7 +654,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb return TSDB_CODE_SUCCESS; } - if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + if (inputType == STREAM_INPUT__DATA_SUBMIT) { tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], suid, TSDB_RETENTION_L1); tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], suid, TSDB_RETENTION_L2); } @@ -675,7 +675,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; } - if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + if (inputType == STREAM_INPUT__DATA_SUBMIT) { STbUidStore uidStore = {0}; tdFetchSubmitReqSuids(pMsg, &uidStore); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 766cf7af35..f36998d606 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -113,8 +113,23 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, return 0; } -int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) { - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, pRsp); +int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { + ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); + + if (pRsp->withSchema) { + ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); + } else { + ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + } + + int32_t len; + int32_t code; + tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); + if (code < 0) { + return -1; + } + int32_t tlen = sizeof(SMqRspHead) + len; void* buf = rpcMallocCont(tlen); if (buf == NULL) { return -1; @@ -125,18 +140,26 @@ int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ((SMqRspHead*)buf)->consumerId = pReq->consumerId; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqDataBlkRsp(&abuf, pRsp); - SRpcMsg resp = { + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + tEncodeSMqDataRsp(&encoder, pRsp); + /*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/ + + SRpcMsg rsp = { .info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0, }; - tmsgSendRsp(&resp); + tmsgSendRsp(&rsp); - tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", - TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, pRsp->reqOffset, pRsp->rspOffset); + char buf1[50]; + char buf2[50]; + tFormatOffset(buf1, 50, &pRsp->reqOffset); + tFormatOffset(buf2, 50, &pRsp->rspOffset); + tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s", + TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; } @@ -151,17 +174,17 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { } tDecoderClear(&decoder); - if (offset.type == TMQ_OFFSET__SNAPSHOT) { + if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) { tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, - TD_VID(pTq->pVnode), offset.uid, offset.ts); - } else if (offset.type == TMQ_OFFSET__LOG) { + TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts); + } else if (offset.val.type == TMQ_OFFSET__LOG) { tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey, - TD_VID(pTq->pVnode), offset.version); + TD_VID(pTq->pVnode), offset.val.version); } else { ASSERT(0); } STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); - if (pOffset == NULL || pOffset->version < offset.version) { + if (pOffset == NULL || pOffset->val.version < offset.val.version) { if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { ASSERT(0); return -1; @@ -171,6 +194,237 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } +static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) { + pRsp->reqOffset = pReq->reqOffset; + + pRsp->blockData = taosArrayInit(0, sizeof(void*)); + pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); + + if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) { + return -1; + } + + pRsp->withTbName = pReq->withTbName; + if (pRsp->withTbName) { + pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); + if (pRsp->blockTbName == NULL) { + // TODO free + return -1; + } + } + + if (subType == TOPIC_SUB_TYPE__COLUMN) { + pRsp->withSchema = false; + } else { + pRsp->withSchema = true; + pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); + if (pRsp->blockSchema == NULL) { + // TODO free + return -1; + } + } + return 0; +} + +static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; } + +static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { + pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; + pOffsetVal->uid = uid; + pOffsetVal->ts = ts; +} + +static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { + pOffsetVal->type = TMQ_OFFSET__LOG; + pOffsetVal->version = ver; +} + +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { + SMqPollReq* pReq = pMsg->pCont; + int64_t consumerId = pReq->consumerId; + int64_t timeout = pReq->timeout; + int32_t reqEpoch = pReq->epoch; + int32_t code = 0; + STqOffsetVal reqOffset = pReq->reqOffset; + STqOffsetVal fetchOffsetNew; + + // 1.find handle + char buf[50]; + tFormatOffset(buf, 50, &reqOffset); + tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, + TD_VID(pTq->pVnode), buf); + + STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); + /*ASSERT(pHandle);*/ + if (pHandle == NULL) { + tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode), + pReq->subKey); + return -1; + } + + // check rebalance + if (pHandle->consumerId != consumerId) { + tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld", + consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId); + return -1; + } + + // update epoch if need + int32_t consumerEpoch = atomic_load_32(&pHandle->epoch); + while (consumerEpoch < reqEpoch) { + consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); + } + + // 2.reset offset if needed + if (reqOffset.type > 0) { + fetchOffsetNew = reqOffset; + } else { + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey); + if (pOffset != NULL) { + fetchOffsetNew = pOffset->val; + char formatBuf[50]; + tFormatOffset(formatBuf, 50, &fetchOffsetNew); + tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf); + } else { + if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { + if (pReq->useSnapshot) { + if (!pHandle->fetchMeta) { + tqOffsetResetToData(&fetchOffsetNew, 0, 0); + } else { + // reset to meta + ASSERT(0); + } + } else { + tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); + } + } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { + tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal)); + } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { + tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s, reset none failed", consumerId, + TD_VID(pTq->pVnode), pReq->subKey); + terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; + return -1; + } + } + } + + // 3.query + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + + if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { + int64_t fetchVer = fetchOffsetNew.version + 1; + SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); + if (pHeadWithCkSum == NULL) { + return -1; + } + + walSetReaderCapacity(pHandle->pWalReader, 2048); + + while (1) { + consumerEpoch = atomic_load_32(&pHandle->epoch); + if (consumerEpoch > reqEpoch) { + tqWarn("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d", + consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); + break; + } + + if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) { + // TODO add push mgr + + tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + goto OVER; + } + + SWalReadHead* pHead = &pHeadWithCkSum->head; + + tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, + TD_VID(pTq->pVnode), fetchVer, pHead->msgType); + + if (pHead->msgType == TDMT_VND_SUBMIT) { + SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + + if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp, workerId) < 0) { + ASSERT(0); + } + // TODO batch optimization: + // TODO continue scan until meeting batch requirement + if (dataRsp.blockNum > 0 /* threshold */) { + tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); + + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + + } else { + fetchVer++; + } + goto OVER; + + } else { + ASSERT(pHandle->fetchMeta); + ASSERT(IS_META_MSG(pHead->msgType)); + tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); + SMqMetaRsp metaRsp = {0}; + metaRsp.reqOffset = pReq->reqOffset.version; + /*tqOffsetResetToLog(&metaR)*/ + metaRsp.rspOffset = fetchVer; + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { + code = -1; + goto OVER; + } + code = 0; + goto OVER; + } + } + + taosMemoryFree(pHeadWithCkSum); + } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { + // 1. set uid and ts + // 2. get data (rebuild reader if needed) + // 3. get new uid and ts + + char formatBuf[50]; + tFormatOffset(formatBuf, 50, &dataRsp.reqOffset); + tqInfo("retrieve using snapshot req offset %s", formatBuf); + if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, workerId) < 0) { + ASSERT(0); + } + + // 4. send rsp + if (dataRsp.blockNum != 0) { + tqOffsetResetToData(&dataRsp.rspOffset, 0, 0); + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + } + } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) { + ASSERT(0); + } + +OVER: + // TODO wrap in destroy func + taosArrayDestroy(dataRsp.blockDataLen); + taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); + + if (dataRsp.withSchema) { + taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + } + + if (dataRsp.withTbName) { + taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree); + } + + return code; +} + +#if 0 int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; @@ -185,10 +439,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } else { STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey); if (pOffset != NULL) { - ASSERT(pOffset->type == TMQ_OFFSET__LOG); + ASSERT(pOffset->val.type == TMQ_OFFSET__LOG); tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey, - TD_VID(pTq->pVnode), pOffset->version); - fetchOffset = pOffset->version + 1; + TD_VID(pTq->pVnode), pOffset->val.version); + fetchOffset = pOffset->val.version + 1; } else { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = walGetFirstVer(pTq->pWal); @@ -241,12 +495,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (rsp.withTbName) { rsp.blockTbName = taosArrayInit(0, sizeof(void*)); } + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { rsp.withSchema = false; - rsp.withTag = false; } else { rsp.withSchema = true; - rsp.withTag = false; rsp.blockSchema = taosArrayInit(0, sizeof(void*)); } @@ -302,10 +555,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } } else { ASSERT(pHandle->fetchMeta); - ASSERT(pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB || - pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE || - pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE || - pHead->msgType == TDMT_VND_DROP_TTL_TABLE); + ASSERT(IS_META_MSG(pHead->msgType)); tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); SMqMetaRsp metaRsp = {0}; metaRsp.reqOffset = pReq->currentOffset; @@ -341,7 +591,7 @@ SEND_RSP: rsp.rspOffset = fetchOffset; - if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) { + if (tqSendDataRsp(pTq, pMsg, pReq, &rsp) < 0) { code = -1; } OVER: @@ -359,6 +609,7 @@ OVER: return code; } +#endif int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; @@ -403,7 +654,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .reader = pHandle->execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, -// .initTsdbReader = 1, }; pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); @@ -474,12 +724,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { if (pTask->execType != TASK_EXEC__NONE) { // expand runners if (pTask->isDataScan) { - STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { .reader = pStreamReader, .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, -// .initTsdbReader = 1, }; /*pTask->exec.inputHandle = pStreamReader;*/ pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index c7d7c787a2..eb09199c70 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -15,7 +15,7 @@ #include "tq.h" -static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRsp) { +static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; @@ -37,7 +37,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRs return 0; } -static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataBlkRsp* pRsp) { +static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataRsp* pRsp) { SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); if (pSW == NULL) { return -1; @@ -46,10 +46,9 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI return 0; } -static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) { +static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t workerId) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); - int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; if (metaGetTableEntryByUid(&mr, uid) < 0) { ASSERT(0); return -1; @@ -60,13 +59,13 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqD return 0; } -int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) { +int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, int32_t workerId) { ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); qTaskInfo_t task = pExec->execCol.task[workerId]; + // TODO set uid and ts if (qStreamScanSnapshot(task) < 0) { ASSERT(0); } - // set version while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -79,17 +78,24 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0); tqAddBlockDataToRsp(pDataBlock, pRsp); + + if (pRsp->withTbName) { + // TODO + pRsp->withTbName = 0; + /*int64_t uid = 0;*/ + /*tqAddTbNameToRsp(pTq, uid, pRsp, workerId);*/ + } pRsp->blockNum++; } return 0; } -int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) { +int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { qTaskInfo_t task = pExec->execCol.task[workerId]; ASSERT(task); - qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); + qSetStreamInput(task, pReq, STREAM_INPUT__DATA_SUBMIT, false); while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -102,13 +108,14 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR tqAddBlockDataToRsp(pDataBlock, pRsp); if (pRsp->withTbName) { - tqAddTbNameToRsp(pTq, pExec, pRsp, workerId); + int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + tqAddTbNameToRsp(pTq, uid, pRsp, workerId); } pRsp->blockNum++; } } else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { pRsp->withSchema = 1; - STqReadHandle* pReader = pExec->pExecReader[workerId]; + SStreamReader* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pReq, 0); while (tqNextDataBlock(pReader)) { SSDataBlock block = {0}; @@ -118,14 +125,15 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR } tqAddBlockDataToRsp(&block, pRsp); if (pRsp->withTbName) { - tqAddTbNameToRsp(pTq, pExec, pRsp, workerId); + int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + tqAddTbNameToRsp(pTq, uid, pRsp, workerId); } tqAddBlockSchemaToRsp(pExec, workerId, pRsp); pRsp->blockNum++; } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { pRsp->withSchema = 1; - STqReadHandle* pReader = pExec->pExecReader[workerId]; + SStreamReader* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pReq, 0); while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { SSDataBlock block = {0}; @@ -135,7 +143,8 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR } tqAddBlockDataToRsp(&block, pRsp); if (pRsp->withTbName) { - tqAddTbNameToRsp(pTq, pExec, pRsp, workerId); + int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + tqAddTbNameToRsp(pTq, uid, pRsp, workerId); } tqAddBlockSchemaToRsp(pExec, workerId, pRsp); pRsp->blockNum++; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index e536e87032..16b8872098 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -92,8 +92,8 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { } int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { - ASSERT(pOffset->type == TMQ_OFFSET__LOG); - ASSERT(pOffset->version >= 0); + /*ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);*/ + /*ASSERT(pOffset->val.version >= 0);*/ return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 2ec627bd5c..bd7cda4de3 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -15,16 +15,17 @@ #include "tq.h" +#if 0 void tqTmrRspFunc(void* param, void* tmrId) { STqHandle* pHandle = (STqHandle*)param; atomic_store_8(&pHandle->pushHandle.tmrStopped, 1); } -static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataBlkRsp* pRsp) { +static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) { SStreamDataSubmit* pSubmit = *ppSubmit; while (pSubmit != NULL) { ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1); - if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { + if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { /*ASSERT(0);*/ } // update processed @@ -43,7 +44,7 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm } int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { - SMqDataBlkRsp rsp = {0}; + SMqDataRsp rsp = {0}; // 1. guard and set status executing int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING); @@ -175,13 +176,13 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ taosWLockLatch(&pHandle->pushHandle.lock); - SMqDataBlkRsp rsp = {0}; + SMqDataRsp rsp = {0}; rsp.reqOffset = pHandle->pushHandle.reqOffset; rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); if (msgType == TDMT_VND_SUBMIT) { - tqDataExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); + tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); } else { // TODO ASSERT(0); @@ -233,6 +234,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ return 0; } +#endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType == TDMT_VND_SUBMIT) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index be2a35c901..7d7b9636df 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -44,10 +44,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* } else { if (pHandle->fetchMeta) { SWalReadHead* pHead = &((*ppHeadWithCkSum)->head); - if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB || - pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE || - pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE || - pHead->msgType == TDMT_VND_DROP_TTL_TABLE) { + if (IS_META_MSG(pHead->msgType)) { code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum); if (code < 0) { @@ -76,8 +73,8 @@ END: return code; } -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { - STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle)); +SStreamReader* tqInitSubmitMsgScanner(SMeta* pMeta) { + SStreamReader* pReadHandle = taosMemoryMalloc(sizeof(SStreamReader)); if (pReadHandle == NULL) { return NULL; } @@ -85,15 +82,15 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->pMsg = NULL; pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; - pReadHandle->cachedSchemaVer = -1; - pReadHandle->cachedSchemaSuid = -1; + pReadHandle->cachedSchemaVer = 0; + pReadHandle->cachedSchemaSuid = 0; pReadHandle->pSchema = NULL; pReadHandle->pSchemaWrapper = NULL; pReadHandle->tbIdHash = NULL; return pReadHandle; } -int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { +int32_t tqReadHandleSetMsg(SStreamReader* pReadHandle, SSubmitReq* pMsg, int64_t ver) { pReadHandle->pMsg = pMsg; if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; @@ -108,7 +105,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t return 0; } -bool tqNextDataBlock(STqReadHandle* pHandle) { +bool tqNextDataBlock(SStreamReader* pHandle) { if (pHandle->pMsg == NULL) return false; while (1) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { @@ -130,7 +127,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { return false; } -bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) { +bool tqNextDataBlockFilterOut(SStreamReader* pHandle, SHashObj* filterOutUids) { while (1) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { return false; @@ -146,7 +143,7 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) { return false; } -int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle) { +int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) { // TODO: cache multiple schema int32_t sversion = htonl(pHandle->pBlock->sversion); if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion || @@ -256,9 +253,9 @@ FAIL: return -1; } -void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } +void tqReadHandleSetColIdList(SStreamReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } -int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { +int tqReadHandleSetTbUidList(SStreamReader* pHandle, const SArray* tbUidList) { if (pHandle->tbIdHash) { taosHashClear(pHandle->tbIdHash); } @@ -277,7 +274,7 @@ int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { return 0; } -int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { +int tqReadHandleAddTbUidList(SStreamReader* pHandle, const SArray* tbUidList) { if (pHandle->tbIdHash == NULL) { pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (pHandle->tbIdHash == NULL) { @@ -294,7 +291,7 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { return 0; } -int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { +int tqReadHandleRemoveTbUidList(SStreamReader* pHandle, const SArray* tbUidList) { ASSERT(pHandle->tbIdHash != NULL); for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 72c766e0ae..d27fcb4f03 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -780,7 +780,7 @@ _exit: // TODO: the partial success scenario and the error case // TODO: refactor if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { - tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); + tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); } return 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6de364e63a..31edc46b4d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -44,12 +44,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu // prevent setting a different type of block pInfo->blockType = type; - if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + if (type == STREAM_INPUT__DATA_SUBMIT) { if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } - } else if (type == STREAM_DATA_TYPE_SSDATA_BLOCK) { + } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; @@ -60,9 +60,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); } - } else if (type == STREAM_DATA_TYPE_FROM_SNAPSHOT) { + } else if (type == STREAM_INPUT__DATA_SCAN) { // do nothing - ASSERT(pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT); + ASSERT(pInfo->blockType == STREAM_INPUT__DATA_SCAN); } else { ASSERT(0); } @@ -76,7 +76,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { return TSDB_CODE_QRY_APP_ERROR; } SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL); + return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__DATA_SCAN, 0, NULL); } int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c51ef44154..8cfdb22313 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -962,7 +962,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor - if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { + if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { /*doClearBufferedBlocks(pInfo);*/ pOperator->status = OP_EXEC_DONE; @@ -973,7 +973,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); blockDataUpdateTsWindow(pBlock, 0); if (pBlock->info.type == STREAM_RETRIEVE) { - pInfo->blockType = STREAM_DATA_TYPE_SUBMIT_BLOCK; + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; copyDataBlock(pInfo->pPullDataRes, pBlock); pInfo->pullDataResIndex = 0; @@ -981,7 +981,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); } return pBlock; - } else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { blockDataDestroy(pInfo->pUpdateRes); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; @@ -1133,7 +1133,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; - } else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) { + } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) { + // check reader last status + // if not match, reset status SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); return pResult && pResult->info.rows > 0 ? pResult : NULL; @@ -1213,7 +1215,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->tableUid = pScanPhyNode->uid; // set the extract column id to streamHandle - tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds); + tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds); SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList); if (code != 0) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index edd1ec0e75..6d287ef887 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -970,9 +970,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { getTableScanInfo(pOperator, &pInfo->order, &scanFlag); if (pInfo->scalarSupp.pExprInfo != NULL) { - SExprSupp* pExprSup =& pInfo->scalarSupp; - projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, - pExprSup->numOfExprs, NULL); + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } // the pDataBlock are always the same one, no need to call this again @@ -1512,23 +1511,24 @@ void increaseTs(SqlFunctionCtx* pCtx) { SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream) { + STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo, bool isStream) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->win = pTaskInfo->window; - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; + pInfo->win = pTaskInfo->window; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; - pInfo->twAggSup = *pTwAggSupp; + pInfo->twAggSup = *pTwAggSupp; if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2498,10 +2498,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); if (IS_FINAL_OP(pInfo)) { - int32_t childIndex = getChildIndex(pBlock); - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + int32_t childIndex = getChildIndex(pBlock); + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; - SExprSupp* pChildSup = &pChildOp->exprSupp; + SExprSupp* pChildSup = &pChildOp->exprSupp; doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval, pChildInfo->primaryTsIndex, pChildSup->numOfExprs, pBlock, NULL); @@ -2527,7 +2527,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } continue; - } else if (pBlock->info.type == STREAM_PUSH_EMPTY && IS_FINAL_OP(pInfo)) { + } else if (pBlock->info.type == STREAM_PUSH_OVER && IS_FINAL_OP(pInfo)) { processPushEmpty(pBlock, pInfo->pPullDataMap); continue; } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 699a050aea..1ff1cbe833 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1052,7 +1052,8 @@ static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); if (!nodesExprHasColumn(pPara)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns", isUnique ? "UNIQUE" : "MODE"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns", + isUnique ? "UNIQUE" : "MODE"); } pFunc->node.resType = ((SExprNode*)pPara)->resType; @@ -1228,19 +1229,19 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // The number of parameters has been limited by the syntax definition - //uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + // uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; // The function return type has been set during syntax parsing uint8_t para2Type = pFunc->node.resType.type; - //if (para2Type != TSDB_DATA_TYPE_BIGINT && para2Type != TSDB_DATA_TYPE_UBIGINT && - // para2Type != TSDB_DATA_TYPE_VARCHAR && para2Type != TSDB_DATA_TYPE_NCHAR && - // para2Type != TSDB_DATA_TYPE_TIMESTAMP) { - // return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - //} - //if ((para2Type == TSDB_DATA_TYPE_TIMESTAMP && IS_VAR_DATA_TYPE(para1Type)) || - // (para2Type == TSDB_DATA_TYPE_BINARY && para1Type == TSDB_DATA_TYPE_NCHAR)) { - // return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - //} + // if (para2Type != TSDB_DATA_TYPE_BIGINT && para2Type != TSDB_DATA_TYPE_UBIGINT && + // para2Type != TSDB_DATA_TYPE_VARCHAR && para2Type != TSDB_DATA_TYPE_NCHAR && + // para2Type != TSDB_DATA_TYPE_TIMESTAMP) { + // return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + // } + // if ((para2Type == TSDB_DATA_TYPE_TIMESTAMP && IS_VAR_DATA_TYPE(para1Type)) || + // (para2Type == TSDB_DATA_TYPE_BINARY && para1Type == TSDB_DATA_TYPE_NCHAR)) { + // return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + // } int32_t para2Bytes = pFunc->node.resType.bytes; if (IS_VAR_DATA_TYPE(para2Type)) { @@ -1639,7 +1640,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "leastsquares", .type = FUNCTION_TYPE_LEASTSQUARES, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateLeastSQR, .getEnvFunc = getLeastSQRFuncEnv, .initFunc = leastSQRFunctionSetup, diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b3ceb92505..3aee636ac9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -17,21 +17,20 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { void* exec = pTask->exec.executor; - bool hasData = false; // set input SStreamQueueItem* pItem = (SStreamQueueItem*)data; if (pItem->type == STREAM_INPUT__TRIGGER) { SStreamTrigger* pTrigger = (SStreamTrigger*)data; - qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false); + qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->isDataScan); SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); + qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SArray* blocks = pBlock->blocks; - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false); } else if (pItem->type == STREAM_INPUT__DROP) { // TODO exec drop return 0; @@ -46,20 +45,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - SSDataBlock block = {0}; - /*block.info.type = STREAM_PUSH_EMPTY;*/ - // block.info.childId = pTask->selfChildId; + SSDataBlock block = {0}; SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data; ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); - /*SSDataBlock* pBlock = createOneDataBlock(taosArrayGet(pRetrieveBlock->blocks, 0), true);*/ assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); - block.info.type = STREAM_PUSH_EMPTY; + block.info.type = STREAM_PUSH_OVER; block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); } break; } - hasData = true; if (output->info.type == STREAM_RETRIEVE) { if (streamBroadcastToChildren(pTask, output) < 0) { @@ -73,9 +68,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); - /*SSDataBlock* outputCopy = createOneDataBlock(output, true);*/ - /*outputCopy->info.childId = pTask->selfChildId;*/ - /*taosArrayPush(pRes, outputCopy);*/ } return 0; } @@ -164,4 +156,3 @@ FAIL: atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); return -1; } - diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1daa9bd50e..97aa182dc9 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -86,9 +86,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ASSERT(pTask->sinkType == TASK_SINK__NONE); } - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (pTask->dispatchType == TASK_DISPATCH__FIXED) { if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; @@ -147,9 +145,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ASSERT(pTask->sinkType == TASK_SINK__NONE); } - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (pTask->dispatchType == TASK_DISPATCH__FIXED) { if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; -- GitLab