diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 7870d7d9a188030806fc113fbadcb2f76172ef66..6fb7e7a1fce8677870abd3f1489dce2bd8b678b2 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -26,6 +26,14 @@ static void msg_process(TAOS_RES* msg) { printf("topic: %s\n", tmq_get_topic_name(msg)); printf("db: %s\n", tmq_get_db_name(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg)); + if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { + void* meta; + int32_t metaLen; + tmq_get_raw_meta(msg, &meta, &metaLen); + + printf("meta, len is %d\n", metaLen); + return; + } while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; @@ -76,19 +84,41 @@ int32_t init_env() { } taos_free_result(pRes); + pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + pRes = taos_query(pConn, "create table if not exists ct1 using st1 tags(2000)"); if (taos_errno(pRes) != 0) { - printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); + printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes)); return -1; } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); pRes = taos_query(pConn, "create table if not exists ct3 using st1 tags(3000)"); if (taos_errno(pRes) != 0) { - printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes)); + printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes)); return -1; } + taos_free_result(pRes); + pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } taos_free_result(pRes); + return 0; } @@ -107,8 +137,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); + /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -168,6 +198,9 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); + + tmq_conf_set(conf, "experiment.use.snapshot", "false"); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); diff --git a/include/client/taos.h b/include/client/taos.h index 9d4da221f4dbcf4ee5b2c6c36b497a350cb60f5c..d31d5c582cd128db3b5217698a27dd92d8a3d108 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -261,7 +261,7 @@ enum tmq_res_t { typedef enum tmq_res_t tmq_res_t; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); -DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, const void **raw_meta, int32_t *raw_meta_len); +DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 5dac79551af2a687283d0023c4932daecba20391..460f7b3b585017c3b748e89838810d0d969d5ece 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -54,6 +54,7 @@ typedef struct { SHashObj* map; // speedup acquire the tableQueryInfo by table uid void* pTagCond; void* pTagIndexCond; + uint64_t suid; } STableListInfo; typedef struct SColumnDataAgg { @@ -115,8 +116,8 @@ typedef struct SQueryTableDataCond { int32_t type; // data block load type: int32_t numOfTWindows; STimeWindow* twindows; - int32_t startVersion; - int32_t endVersion; + int64_t startVersion; + int64_t endVersion; } SQueryTableDataCond; void* blockDataDestroy(SSDataBlock* pBlock); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0ef9dffa2e98cc48efa0fd78fe5f516217278baa..dd0a81af0389a0ab6e3027b548498d032c8a021e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -696,12 +696,12 @@ typedef struct { typedef STableCfg STableCfgRsp; -int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq); -int32_t tDeserializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq); +int32_t tSerializeSTableCfgReq(void* buf, int32_t bufLen, STableCfgReq* pReq); +int32_t tDeserializeSTableCfgReq(void* buf, int32_t bufLen, STableCfgReq* pReq); -int32_t tSerializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp); -int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp); -void tFreeSTableCfgRsp(STableCfgRsp *pRsp); +int32_t tSerializeSTableCfgRsp(void* buf, int32_t bufLen, STableCfgRsp* pRsp); +int32_t tDeserializeSTableCfgRsp(void* buf, int32_t bufLen, STableCfgRsp* pRsp); +void tFreeSTableCfgRsp(STableCfgRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; @@ -2669,6 +2669,7 @@ typedef struct { SMsgHead head; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int8_t withTbName; + int8_t useSnapshot; int32_t epoch; uint64_t reqId; int64_t consumerId; diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index c23cf162aa6e1ca35e3750aa54e91659c45d6b08..957c40f21e455a91cb916229565760782bd15d31 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -33,7 +33,7 @@ struct SDataSink; struct SSDataBlock; typedef struct SDeleterRes { - uint64_t uid; + uint64_t suid; SArray* uidList; int64_t skey; int64_t ekey; @@ -41,7 +41,8 @@ typedef struct SDeleterRes { } SDeleterRes; typedef struct SDeleterParam { - SArray* pUidList; + uint64_t suid; + SArray* pUidList; } SDeleterParam; typedef struct SDataSinkStat { diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 083f6ae1b07d4db9e7f4d339b63b8b89c0a8becc..6738fc23bc474270119a78fa67c4c777c8c696cc 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -36,11 +36,13 @@ typedef struct SReadHandle { void* vnode; void* mnd; SMsgCb* pMsgCb; + int8_t initTsdbReader; } SReadHandle; enum { STREAM_DATA_TYPE_SUBMIT_BLOCK = 1, STREAM_DATA_TYPE_SSDATA_BLOCK = 2, + STREAM_DATA_TYPE_FROM_SNAPSHOT = 3, }; typedef enum { @@ -56,6 +58,13 @@ typedef enum { */ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); +/** + * Switch the stream scan to snapshot mode + * @param tinfo + * @return + */ +int32_t qStreamScanSnapshot(qTaskInfo_t tinfo); + /** * Set the input data block for the stream scan. * @param tinfo diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 3b8a37f4208386eb06ff4f3153fab271e3ddb011..36e9b3309c6dd1c2827b9e03ddbbeb80c721f797 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -32,7 +32,7 @@ enum { }; typedef struct SDeleteRes { - uint64_t uid; + uint64_t suid; SArray* uidList; int64_t skey; int64_t ekey; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index eb68f52a40366057a76acc11c17a4f661c2a95cc..8af594530058500dcb9e51bc7146ce96af55bab6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -71,6 +71,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029) #define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030) #define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) +#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ab4d14b1cc4c6b17ebec20fb0c5c234b4ffcaea6..22747425f078ed16df9e76a0e1eb0a647f56241c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { pResultInfo->current += 1; return pResultInfo->row; } + } else if (TD_RES_TMQ_META(res)) { + return NULL; } else { // assert to avoid un-initialization error ASSERT(0); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 3c349f61a19a8a429078994448943231fe41d3be..637a7ee5dd1073c69de38bcba2f2e973eb49d987 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -54,6 +54,7 @@ struct tmq_conf_t { int8_t autoCommit; int8_t resetOffset; int8_t withTbName; + int8_t useSnapshot; uint16_t port; int32_t autoCommitInterval; char* ip; @@ -69,6 +70,7 @@ struct tmq_t { char groupId[TSDB_CGROUP_LEN]; char clientId[256]; int8_t withTbName; + int8_t useSnapshot; int8_t autoCommit; int32_t autoCommitInterval; int32_t resetOffsetCfg; @@ -282,6 +284,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } + if (strcmp(key, "experiment.use.snapshot") == 0) { + if (strcmp(value, "true") == 0) { + conf->useSnapshot = true; + return TMQ_CONF_OK; + } else if (strcmp(value, "false") == 0) { + conf->useSnapshot = false; + return TMQ_CONF_OK; + } else { + return TMQ_CONF_INVALID; + } + } + if (strcmp(key, "td.connect.ip") == 0) { conf->ip = strdup(value); return TMQ_CONF_OK; @@ -953,6 +967,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); pTmq->withTbName = conf->withTbName; + pTmq->useSnapshot = conf->useSnapshot; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->commitCb = conf->commitCb; @@ -1145,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; - if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { - } SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { @@ -1159,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { pRspWrapper->vgHandle = pVg; pRspWrapper->topicHandle = pTopic; - memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - if (rspType == TMQ_MSG_TYPE__POLL_RSP) { + memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp); } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); + memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp); } taosMemoryFree(pMsg->pData); - tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, - pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset); + tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId, + pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset, rspType); taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1534,6 +1547,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pReq->currentOffset = reqOffset; pReq->reqId = generateRequestId(); + pReq->useSnapshot = tmq->useSnapshot; + pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqPollReq)); return pReq; @@ -1541,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); - pRspObj->resType = RES_TYPE__TMQ; + pRspObj->resType = RES_TYPE__TMQ_META; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; @@ -1659,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) return 0; } -SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { +void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { while (1) { SMqRspWrapper* rspWrapper = NULL; taosGetQitem(tmq->qall, (void**)&rspWrapper); @@ -1699,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); - if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { + if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ - pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; + pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); return pRsp; } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", - pollRspWrapper->dataRsp.head.epoch, consumerEpoch); + pollRspWrapper->metaRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } } else { @@ -1727,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { - SMqRspObj* rspObj; - int64_t startTime = taosGetTimestampMs(); + void* rspObj; + int64_t startTime = taosGetTimestampMs(); #if 0 tmqHandleAllDelayedTask(tmq); @@ -1856,7 +1871,7 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } -int32_t tmq_get_raw_meta(TAOS_RES* res, const void** raw_meta, int32_t* raw_meta_len) { +int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) { if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; *raw_meta = pMetaRspObj->metaRsp.metaRsp; diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 0b59e9b6cc611a9da3362fbef3da410376c0ea58..8fbdeb06545d9bdd31db2e630b8a7b7c52281587 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -76,22 +76,28 @@ void deltaToUtcInitOnce() { static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); -static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); -static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); +static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim); +static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim); static char* forwardToTimeStringEnd(char* str); static bool checkTzPresent(const char* str, int32_t len); -static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec) = {parseLocaltime, - parseLocaltimeDst}; +static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) = {parseLocaltime, + parseLocaltimeDst}; int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { - return parseTimeWithTz(timestr, utime, timePrec, 'T'); - } else if (checkTzPresent(timestr, len)) { - return parseTimeWithTz(timestr, utime, timePrec, 0); + if (checkTzPresent(timestr, len)) { + return parseTimeWithTz(timestr, utime, timePrec, 'T'); + } else { + return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec, 'T'); + } } else { - return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec); + if (checkTzPresent(timestr, len)) { + return parseTimeWithTz(timestr, utime, timePrec, 0); + } else { + return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec, 0); + } } } @@ -333,13 +339,25 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) { return true; } -int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { +int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) { *time = 0; struct tm tm = {0}; - char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); + char *str; + if (delim == 'T') { + str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm); + } else if (delim == 0) { + str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); + } else { + str = NULL; + } + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { - return -1; + //if parse failed, try "%Y-%m-%d" format + str = taosStrpTime(timestr, "%Y-%m-%d", &tm); + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { + return -1; + } } #ifdef _MSC_VER @@ -367,14 +385,26 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr return 0; } -int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { +int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) { *time = 0; struct tm tm = {0}; tm.tm_isdst = -1; - char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); + char *str; + if (delim == 'T') { + str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm); + } else if (delim == 0) { + str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); + } else { + str = NULL; + } + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { - return -1; + //if parse failed, try "%Y-%m-%d" format + str = taosStrpTime(timestr, "%Y-%m-%d", &tm); + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { + return -1; + } } /* mktime will be affected by TZ, set by using taos_options */ diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 78c4ea794fd09b65855a6eddf33115e82c5c57f3..b159061ce331570c9afcbbe56c0e4a6eefad330c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -648,30 +648,30 @@ _OVER: } static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SDnodeObj *pDnode = NULL; - SMnodeObj *pMObj = NULL; - SQnodeObj *pQObj = NULL; - SSnodeObj *pSObj = NULL; - SMDropMnodeReq dropReq = {0}; - - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SDnodeObj *pDnode = NULL; + SMnodeObj *pMObj = NULL; + SQnodeObj *pQObj = NULL; + SSnodeObj *pSObj = NULL; + SDropDnodeReq dropReq = {0}; + + if (tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto _OVER; } - mInfo("dnode:%d, start to drop", dropReq.dnodeId); - - if (dropReq.dnodeId <= 0) { - terrno = TSDB_CODE_MND_INVALID_DNODE_ID; - goto _OVER; - } + mInfo("dnode:%d, start to drop, ep:%s:%d", dropReq.dnodeId, dropReq.fqdn, dropReq.port); pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId); if (pDnode == NULL) { - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - goto _OVER; + char ep[TSDB_EP_LEN + 1] = {0}; + snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port); + pDnode = mndAcquireDnodeByEp(pMnode, ep); + if (pDnode == NULL) { + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + goto _OVER; + } } pQObj = mndAcquireQnode(pMnode, dropReq.dnodeId); @@ -726,6 +726,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { return -1; } + mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value); SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId); if (pDnode == NULL) { mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr()); @@ -742,7 +743,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { tSerializeSMCfgDnodeReq(pBuf, bufLen, &cfgReq); mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, pReq->info.ahandle); - SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; + SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen}; return tmsgSendReq(&epSet, &rpcMsg); } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b1729c4047563d561b5800d3b83a4a5673cae638..9d8ed3701ee9a7001c20540b3e6c97f87db40a29 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -509,7 +509,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskOneLevel, pTask); - // input + // source pTask->isDataScan = 1; // trigger diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 15eb35c700f42edeb041b69b48385ab5df6851f6..b09ab3422429876e5ecd41d79318de07a05f2739 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -29,6 +29,7 @@ target_sources( # sma "src/sma/sma.c" "src/sma/smaEnv.c" + "src/sma/smaUtil.c" "src/sma/smaOpen.c" "src/sma/smaRollup.c" "src/sma/smaTimeRange.c" diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 902e5e1bcc291fbe4826b41d8702f716fd6b6a58..1489b02492575791547e873d6414a932bcc2d3cf 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -34,7 +34,8 @@ extern "C" { typedef struct SSmaEnv SSmaEnv; typedef struct SSmaStat SSmaStat; -typedef struct SSmaStatItem SSmaStatItem; +typedef struct STSmaStat STSmaStat; +typedef struct SRSmaStat SRSmaStat; typedef struct SSmaKey SSmaKey; typedef struct SRSmaInfo SRSmaInfo; typedef struct SRSmaInfoItem SRSmaInfoItem; @@ -45,26 +46,38 @@ struct SSmaEnv { SSmaStat *pStat; }; -#define SMA_ENV_LOCK(env) ((env)->lock) -#define SMA_ENV_TYPE(env) ((env)->type) -#define SMA_ENV_STAT(env) ((env)->pStat) -#define SMA_ENV_STAT_ITEM(env) ((env)->pStat->tsmaStatItem) +#define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_TYPE(env) ((env)->type) +#define SMA_ENV_STAT(env) ((env)->pStat) -struct SSmaStatItem { +struct STSmaStat { int8_t state; // ETsdbSmaStat STSma *pTSma; // cache schema STSchema *pTSchema; }; +struct SRSmaStat { + SSma *pSma; + void *tmrHandle; + tmr_h tmrId; + int8_t tmrStat; + int32_t tmrSeconds; + SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; +}; + struct SSmaStat { union { - SSmaStatItem tsmaStatItem; - SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; + STSmaStat tsmaStat; // time-range-wise sma + SRSmaStat rsmaStat; // rollup sma }; T_REF_DECLARE() }; -#define SMA_STAT_ITEM(s) ((s)->tsmaStatItem) -#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash) +#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) +#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) +#define SMA_RSMA_INFO_HASH(s) ((s)->rsmaStat.rsmaInfoHash) +#define SMA_RSMA_TMR_HANDLE(s) ((s)->rsmaStat.tmrHandle) +#define SMA_RSMA_TMR_STAT(s) ((s)->rsmaStat.tmrStat) +#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); @@ -107,53 +120,51 @@ static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) { return 0; } -static FORCE_INLINE int8_t tdSmaStat(SSmaStatItem *pStatItem) { - if (pStatItem) { - return atomic_load_8(&pStatItem->state); +static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) { + if (pTStat) { + return atomic_load_8(&pTStat->state); } return TSDB_SMA_STAT_UNKNOWN; } -static FORCE_INLINE bool tdSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) { - if (!pStatItem) { +static FORCE_INLINE bool tdSmaStatIsOK(STSmaStat *pTStat, int8_t *state) { + if (!pTStat) { return false; } if (state) { - *state = atomic_load_8(&pStatItem->state); + *state = atomic_load_8(&pTStat->state); return *state == TSDB_SMA_STAT_OK; } - return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK; + return atomic_load_8(&pTStat->state) == TSDB_SMA_STAT_OK; } -static FORCE_INLINE bool tdSmaStatIsExpired(SSmaStatItem *pStatItem) { - return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true; +static FORCE_INLINE bool tdSmaStatIsExpired(STSmaStat *pTStat) { + return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_EXPIRED) : true; } -static FORCE_INLINE bool tdSmaStatIsDropped(SSmaStatItem *pStatItem) { - return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true; +static FORCE_INLINE bool tdSmaStatIsDropped(STSmaStat *pTStat) { + return pTStat ? (atomic_load_8(&pTStat->state) & TSDB_SMA_STAT_DROPPED) : true; } -static FORCE_INLINE void tdSmaStatSetOK(SSmaStatItem *pStatItem) { - if (pStatItem) { - atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK); +static FORCE_INLINE void tdSmaStatSetOK(STSmaStat *pTStat) { + if (pTStat) { + atomic_store_8(&pTStat->state, TSDB_SMA_STAT_OK); } } -static FORCE_INLINE void tdSmaStatSetExpired(SSmaStatItem *pStatItem) { - if (pStatItem) { - atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED); +static FORCE_INLINE void tdSmaStatSetExpired(STSmaStat *pTStat) { + if (pTStat) { + atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_EXPIRED); } } -static FORCE_INLINE void tdSmaStatSetDropped(SSmaStatItem *pStatItem) { - if (pStatItem) { - atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED); +static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { + if (pTStat) { + atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED); } } -static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); -void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); @@ -163,6 +174,51 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); +typedef struct STFInfo STFInfo; +typedef struct STFile STFile; + +struct STFInfo { + uint32_t magic; + uint32_t ftype; + uint32_t fver; + uint64_t fsize; +}; + +struct STFile { + STFInfo info; + STfsFile f; + TdFilePtr pFile; + uint8_t state; +}; + +#define TD_FILE_F(tf) (&((tf)->f)) +#define TD_FILE_PFILE(tf) ((tf)->pFile) +#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) +#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname) +#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname) +#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) +#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf)) +#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL) +#define TD_FILE_STATE(tf) ((tf)->state) +#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s)) +#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did) +#define TD_FILE_IS_OK(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_OK) +#define TD_FILE_IS_BAD(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_BAD) + +int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname); +int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType); +int32_t tdOpenTFile(STFile *pTFile, int flags); +int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte); +int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence); +int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte); +int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset); +int32_t tdRemoveTFile(STFile *pTFile); +int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo); +int32_t tdUpdateTFileHeader(STFile *pTFile); +void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); +void tdCloseTFile(STFile *pTFile); +void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 8f6077e996a343c5daa532cde85557efd81fee91..59c3e95b9cf1b3f207e53a767b1f6e8877bc7976 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -150,6 +150,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* // tqExec int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId); +int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId); int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp); // tqMeta diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f8bdb63c861e23f483ebceaa4ad7459d23b134b6..f7965f090240844230aeeef7a98e3b0e4cc6900e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -247,7 +247,6 @@ struct SVnode { struct STbUidStore { tb_uid_t suid; - tb_uid_t uid; // TODO: just for debugging, remove when uid provided in SSDataBlock SArray* tbUids; SHashObj* uidHash; }; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index d655883a762b743c9fd0f2cf6a695aeea43f8b66..910f4bba518ddf3ea192f731eba998684f5626f0 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -219,11 +219,9 @@ _err: } int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ - metaRLock(pMeta); TBC * pCur; int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL); if (ret < 0) { - metaULock(pMeta); return ret; } @@ -249,6 +247,7 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ tdbTbcClose(pCur); tdbFree(pKey); + return 0; } @@ -265,8 +264,8 @@ struct SMCtbCursor { SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *pCtbCur = NULL; SCtbIdxKey ctbIdxKey; - int ret; - int c; + int ret = 0; + int c = 0; pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur)); if (pCtbCur == NULL) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 2bbea593faa032d385fc97bfac23bceaaf2e1828..ab512f7774f066245c2837a19087d09471b98253 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -375,6 +375,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { metaWLock(pMeta); int ret = metaTtlSmaller(pMeta, ttl, tbUids); if(ret != 0){ + metaULock(pMeta); return ret; } for (int i = 0; i < taosArrayGetSize(tbUids); ++i) { diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index a80af2b20255fa51d114d9568afadac9318765cf..4e907e93ca05d99fae85a91f97be15e195f927c1 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -21,9 +21,10 @@ typedef struct SSmaStat SSmaStat; // declaration of static functions -static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); +static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma); static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path); static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv); +static void *tdFreeTSmaStat(STSmaStat *pStat); // implementation @@ -45,7 +46,7 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) return NULL; } - if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) { + if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) { tdFreeSmaEnv(pEnv); return NULL; } @@ -105,7 +106,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { return 0; } -static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { +static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) { ASSERT(pSmaStat != NULL); if (*pSmaStat) { // no lock @@ -125,10 +126,23 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { } if (smaType == TSDB_SMA_TYPE_ROLLUP) { - SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit( - RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + SMA_RSMA_STAT(*pSmaStat)->pSma = (SSma*)pSma; + // init timer + SMA_RSMA_TMR_HANDLE(*pSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA_G"); + if (!SMA_RSMA_TMR_HANDLE(*pSmaStat)) { + taosMemoryFreeClear(*pSmaStat); + return TSDB_CODE_FAILED; + } + + atomic_store_8(&SMA_RSMA_TMR_STAT(*pSmaStat), TASK_TRIGGER_STATUS__ACTIVE); - if (!SMA_STAT_INFO_HASH(*pSmaStat)) { + // init hash + SMA_RSMA_INFO_HASH(*pSmaStat) = taosHashInit( + RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (!SMA_RSMA_INFO_HASH(*pSmaStat)) { + if (SMA_RSMA_TMR_HANDLE(*pSmaStat)) { + taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(*pSmaStat)); + } taosMemoryFreeClear(*pSmaStat); return TSDB_CODE_FAILED; } @@ -141,16 +155,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { return TSDB_CODE_SUCCESS; } -void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { - if (pSmaStatItem) { - tDestroyTSma(pSmaStatItem->pTSma); - taosMemoryFreeClear(pSmaStatItem->pTSma); - taosMemoryFreeClear(pSmaStatItem); +static void *tdFreeTSmaStat(STSmaStat *pStat) { + if (pStat) { + tDestroyTSma(pStat->pTSma); + taosMemoryFreeClear(pStat->pTSma); + taosMemoryFreeClear(pStat); } return NULL; } -void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { +void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { tdDestroySmaState(pSmaStat, smaType); taosMemoryFreeClear(pSmaStat); return NULL; @@ -165,16 +179,19 @@ void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - tdFreeSmaStatItem(&pSmaStat->tsmaStatItem); + tdFreeTSmaStat(&pSmaStat->tsmaStat); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { + if (SMA_RSMA_TMR_HANDLE(pSmaStat)) { + taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(pSmaStat)); + } // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. - void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL); + void *infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), NULL); while (infoHash) { SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash; tdFreeRSmaInfo(pInfoHash); - infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash); + infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), infoHash); } - taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat)); + taosHashCleanup(SMA_RSMA_INFO_HASH(pSmaStat)); } else { ASSERT(0); } @@ -273,4 +290,4 @@ void smaTimerCleanUp(void *timer, int8_t *initFlag) { taosTmrCleanUp(timer); atomic_store_8(initFlag, 0); } -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index a1c47a96c0d2d12a6652dc66019a109fc698a5af..383c264e7b755870708955b96cfbb88006c85184 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -137,4 +137,17 @@ int32_t smaClose(SSma *pSma) { taosMemoryFreeClear(pSma); } return 0; +} + +/** + * @brief rsma env restore + * + * @param pSma + * @return int32_t + */ +int32_t smaRestore(SSma *pSma) { + if (!pSma) return 0; + // iterate all stables to restore the rsma env + + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 1f18f6cb8777452dde659bbdec8a22d9f84e30de..3c2f710fa720fc91c1d94309bad2d27d5da68ca4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -16,35 +16,17 @@ #include "sma.h" #include "tstream.h" -static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); -static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); -static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, - tb_uid_t suid, int8_t level); - -#define SET_RSMA_INFO_ITEM_PARAMS(__idx, __level) \ - if (param->qmsg[__idx]) { \ - pRSmaInfo->items[__idx].pRsmaInfo = pRSmaInfo; \ - pRSmaInfo->items[__idx].taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], &handle); \ - if (!pRSmaInfo->items[__idx].taskInfo) { \ - goto _err; \ - } \ - pRSmaInfo->items[__idx].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; \ - if (param->maxdelay[__idx] < 1) { \ - int64_t msInterval = \ - convertTimeFromPrecisionToUnit(pRetention[__level].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); \ - pRSmaInfo->items[__idx].maxDelay = msInterval; \ - } else { \ - pRSmaInfo->items[__idx].maxDelay = param->maxdelay[__idx]; \ - } \ - if (pRSmaInfo->items[__idx].maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { \ - pRSmaInfo->items[__idx].maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; \ - } \ - pRSmaInfo->items[__idx].level = TSDB_RETENTION_L##__level; \ - pRSmaInfo->items[__idx].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); \ - if (!pRSmaInfo->items[__idx].tmrHandle) { \ - goto _err; \ - } \ - } +typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T; +static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"}; + +static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); +static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); +static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle, + int8_t idx); +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, + tb_uid_t suid, int8_t level); +static void tdRSmaFetchTrigger(void *param, void *tmrId); +static void tdRSmaPersistTrigger(void *param, void *tmrId); struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; @@ -56,14 +38,6 @@ struct SRSmaInfoItem { int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE int32_t maxDelay; }; - -typedef struct { - int64_t suid; - SRSmaInfoItem *pItem; - SSma *pSma; - STSchema *pTSchema; -} SRSmaTriggerParam; - struct SRSmaInfo { STSchema *pTSchema; SSma *pSma; @@ -81,7 +55,7 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) { void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { if (pInfo) { - for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; if (pItem->taskInfo) { tdFreeTaskHandle(pItem->taskInfo); @@ -118,7 +92,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA return TSDB_CODE_FAILED; } - pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); terrno = TSDB_CODE_RSMA_INVALID_STAT; @@ -187,7 +161,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; - if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) { + if (!pStat || !(infoHash = SMA_RSMA_INFO_HASH(pStat))) { terrno = TSDB_CODE_RSMA_INVALID_STAT; return TSDB_CODE_FAILED; } @@ -213,6 +187,40 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui return TSDB_CODE_SUCCESS; } +static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *pReadHandle, + int8_t idx) { + SRetention *pRetention = SMA_RETENTION(pSma); + STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); + + if (param->qmsg[idx]) { + SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); + pItem->pRsmaInfo = pRSmaInfo; + pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], pReadHandle); + if (!pItem->taskInfo) { + goto _err; + } + pItem->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; + if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { + int64_t msInterval = + convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); + pItem->maxDelay = (int32_t)msInterval; + } else { + pItem->maxDelay = (int32_t)param->maxdelay[idx]; + } + if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { + pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; + } + pItem->level = (idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2); + pItem->tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); + if (!pItem->tmrHandle) { + goto _err; + } + } + return TSDB_CODE_SUCCESS; +_err: + return TSDB_CODE_FAILED; +} + /** * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam. * @@ -246,7 +254,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; - pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); if (pRSmaInfo) { ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); @@ -282,14 +290,14 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { pRSmaInfo->pSma = pSma; pRSmaInfo->suid = pReq->suid; - SRetention *pRetention = SMA_RETENTION(pSma); - STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); - - SET_RSMA_INFO_ITEM_PARAMS(0, 1); - SET_RSMA_INFO_ITEM_PARAMS(1, 2); + if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) { + goto _err; + } + if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 1) < 0) { + goto _err; + } - if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != - TSDB_CODE_SUCCESS) { + if (taosHashPut(SMA_RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { goto _err; } else { smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid); @@ -418,7 +426,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { if (!pBlock) break; tdUidStorePut(pStore, msgIter.suid, NULL); - pStore->uid = msgIter.uid; // TODO: remove, just for debugging } if (terrno != TSDB_CODE_SUCCESS) return -1; @@ -439,8 +446,9 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) if (!output) { break; } + if (!pResult) { - pResult = taosArrayInit(0, sizeof(SSDataBlock)); + pResult = taosArrayInit(1, sizeof(SSDataBlock)); if (!pResult) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; @@ -451,7 +459,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) } if (taosArrayGetSize(pResult) > 0) { -#if 0 +#if 1 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowData(pResult, flag); @@ -459,14 +467,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); SSubmitReq *pReq = NULL; if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) { - taosArrayDestroy(pResult); - return TSDB_CODE_FAILED; + goto _err; } if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { - taosArrayDestroy(pResult); taosMemoryFreeClear(pReq); - return TSDB_CODE_FAILED; + goto _err; } taosMemoryFreeClear(pReq); @@ -479,7 +485,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) } taosArrayDestroy(pResult); - return 0; + return TSDB_CODE_SUCCESS; +_err: + taosArrayDestroy(pResult); + return TSDB_CODE_FAILED; } /** @@ -488,13 +497,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) * @param param * @param tmrId */ -static void rsmaTriggerByTimer(void *param, void *tmrId) { - // SRSmaTriggerParam *pParam = (SRSmaTriggerParam *)param; - // SRSmaInfoItem *pItem = pParam->pItem; +static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfoItem *pItem = param; if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { - smaTrace("level %" PRIi8 " status is active for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); + smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is active for tb suid:%" PRIi64, __func__, __LINE__, + taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid); SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); @@ -502,10 +510,11 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) { tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK); } else { - smaTrace("level %" PRIi8 " status is inactive for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); + smaWarn("%s:%d THREAD:%" PRIi64 " level %" PRIi8 " status is inactive for tb suid:%" PRIi64, __func__, __LINE__, + taosGetSelfPthreadId(), pItem->level, pItem->pRsmaInfo->suid); } - // taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); + // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); } static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, @@ -518,16 +527,20 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, pItem->taskInfo, suid); - // inputType = STREAM_DATA_TYPE_SUBMIT_BLOCK(1) - if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { + if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // STREAM_DATA_TYPE_SUBMIT_BLOCK smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } - // SRSmaTriggerParam triggerParam = {.suid = suid, .pItem = pItem, .pSma = pSma, .pTSchema = pTSchema}; tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK); atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); - taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); + smaWarn("%s:%d THREAD:%" PRIi64 " process rsma insert", __func__, __LINE__, taosGetSelfPthreadId()); + + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); + + taosTmrStart(tdRSmaPersistTrigger, 5000, pStat, pStat->tmrHandle); + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); return TSDB_CODE_SUCCESS; } @@ -542,7 +555,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; - pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(SMA_RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); @@ -594,3 +607,106 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { } return TSDB_CODE_SUCCESS; } + +void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char* outputName) { + tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName); +} + +static void *tdRSmaPersistExec(void *param) { + setThreadName("rsma-task-persist"); + SRSmaStat *pRSmaStat = param; + SSma *pSma = pRSmaStat->pSma; + STfs *pTfs = pSma->pVnode->pTfs; + int64_t toffset = 0; + + void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); + if (!infoHash) { + goto _end; + } + + STFile tFile = {0}; + int32_t vid = 2; + char qTaskInfoFName[TSDB_FILENAME_LEN]; + tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName); + tdInitTFile(&tFile, pTfs, qTaskInfoFName); + tdCreateTFile(&tFile, pTfs, true, -1); + + while (infoHash) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + char *pOutput = NULL; + int32_t len = 0; + if (qSerializeTaskStatus(pRSmaInfo->items[0].taskInfo, &pOutput, &len) < 0) { + smaError("serialize rsma task for table %" PRIi64 " failed since %s", pRSmaInfo->items[0].pRsmaInfo->suid, + terrstr(terrno)); + } else { + smaWarn("serialize rsma task for table %" PRIi64 " success and len is %d", pRSmaInfo->items[0].pRsmaInfo->suid, + len); + } + tdAppendTFile(&tFile, &len, sizeof(len), &toffset); + tdAppendTFile(&tFile, pOutput, len, &toffset); + + taosMemoryFree(pOutput); + infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash); + } +_end: + + if (tdUpdateTFileHeader(&tFile) < 0) { + smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_FILE_FULL_NAME(&tFile), tstrerror(terrno)); + tdCloseTFile(&tFile); + tdRemoveTFile(&tFile); + return NULL; + } + + tdCloseTFile(&tFile); + + char newFName[TSDB_FILENAME_LEN]; + strncpy(newFName, TD_FILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN); + char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]); + strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); + taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName); + + atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE); + return NULL; +_err: + atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__ACTIVE); + // remove the .tmp file + return NULL; +} + +static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { + smaWarn("%s:%d entry ", __func__, __LINE__); + TdThread threadId; + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); + + if (taosThreadCreate(&threadId, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) { + smaError("failed to create thread to persist rsma qTaskInfo since %s", strerror(errno)); + } + + taosThreadAttrDestroy(&thAttr); + smaWarn("%s:%d end ", __func__, __LINE__); +} + +/** + * @brief trigger to persist rsma qTaskInfo + * + * @param param + * @param tmrId + */ +static void tdRSmaPersistTrigger(void *param, void *tmrId) { + SRSmaStat *pRSmaStat = param; + + if (atomic_load_8(&pRSmaStat->tmrStat) == TASK_TRIGGER_STATUS__ACTIVE) { + smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence start since active", __func__, __LINE__, taosGetSelfPthreadId()); + atomic_store_8(&pRSmaStat->tmrStat, TASK_TRIGGER_STATUS__IN_ACTIVE); + + // execution + tdRSmaPersistTask(pRSmaStat); + } else { + smaWarn("%s:%d THREAD:%" PRIi64 " rsma persistence not start since inactive", __func__, __LINE__, + taosGetSelfPthreadId()); + } + + taosTmrReset(tdRSmaPersistTrigger, 3600000, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId); +} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 4352c466c5526cc4a00f10c629e2e99b1015cf7d..2244b91c28136b7d4f076259888ef8906fab011b 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -129,7 +129,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); SSmaStat *pStat = NULL; - SSmaStatItem *pItem = NULL; + STSmaStat *pItem = NULL; if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { terrno = TSDB_CODE_TSMA_INVALID_STAT; @@ -137,7 +137,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { } tdRefSmaStat(pSma, pStat); - pItem = &pStat->tsmaStatItem; + pItem = &pStat->tsmaStat; ASSERT(pItem); diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c new file mode 100644 index 0000000000000000000000000000000000000000..76690a3bde0215fcc8ffc16a992abad8375ceb11 --- /dev/null +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -0,0 +1,238 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sma.h" + +#define TD_FILE_HEAD_SIZE 512 + +#define TD_FILE_STATE_OK 0 +#define TD_FILE_STATE_BAD 1 + +#define TD_FILE_INIT_MAGIC 0xFFFFFFFF + + +static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo); +static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo); + +static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) { + int32_t tlen = 0; + + tlen += taosEncodeFixedU32(buf, pInfo->magic); + tlen += taosEncodeFixedU32(buf, pInfo->ftype); + tlen += taosEncodeFixedU32(buf, pInfo->fver); + tlen += taosEncodeFixedU64(buf, pInfo->fsize); + + return tlen; +} + +static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + buf = taosDecodeFixedU32(buf, &(pInfo->ftype)); + buf = taosDecodeFixedU32(buf, &(pInfo->fver)); + buf = taosDecodeFixedU64(buf, &(pInfo->fsize)); + return buf; +} + +int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { + ASSERT(TD_FILE_OPENED(pTFile)); + + int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte); + if (nwrite < nbyte) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return nwrite; +} + +int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { + ASSERT(TD_FILE_OPENED(pTFile)); + + int64_t loffset = taosLSeekFile(TD_FILE_PFILE(pTFile), offset, whence); + if (loffset < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return loffset; +} + +int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { + ASSERT(TD_FILE_OPENED(pTFile)); + + int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte); + if (nread < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return nread; +} + +int32_t tdUpdateTFileHeader(STFile *pTFile) { + char buf[TD_FILE_HEAD_SIZE] = "\0"; + + if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { + return -1; + } + + void *ptr = buf; + tdEncodeTFInfo(&ptr, &(pTFile->info)); + + taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE); + if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) { + return -1; + } + + return 0; +} + +int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) { + char buf[TD_FILE_HEAD_SIZE] = "\0"; + uint32_t _version; + + ASSERT(TD_FILE_OPENED(pTFile)); + + if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { + return -1; + } + + if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) { + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + + void *pBuf = buf; + pBuf = tdDecodeTFInfo(pBuf, pInfo); + return 0; +} + +void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) { + pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM)); +} + +int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) { + ASSERT(TD_FILE_OPENED(pTFile)); + + int64_t toffset; + + if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) { + return -1; + } + + ASSERT(pTFile->info.fsize == toffset); + + if (offset) { + *offset = toffset; + } + + if (tdWriteTFile(pTFile, buf, nbyte) < 0) { + return -1; + } + + pTFile->info.fsize += nbyte; + + return nbyte; +} + +int32_t tdOpenTFile(STFile *pTFile, int flags) { + ASSERT(!TD_FILE_OPENED(pTFile)); + + pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), flags); + if (pTFile->pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} + +void tdCloseTFile(STFile *pTFile) { + if (TD_FILE_OPENED(pTFile)) { + taosCloseFile(&pTFile->pFile); + TD_FILE_SET_CLOSED(pTFile); + } +} + +void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName) { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vid, dname, fname); +} + +int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { + char fullname[TSDB_FILENAME_LEN]; + SDiskID did = {0}; + + TD_FILE_SET_STATE(pTFile, TD_FILE_STATE_OK); + TD_FILE_SET_CLOSED(pTFile); + + memset(&(pTFile->info), 0, sizeof(pTFile->info)); + pTFile->info.magic = TD_FILE_INIT_MAGIC; + + if (tfsAllocDisk(pTfs, 0, &did) < 0) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + return -1; + } + + tfsInitFile(pTfs, &(pTFile->f), did, fname); + + return 0; +} + +int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) { + ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); + + pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pTFile->pFile == NULL) { + if (errno == ENOENT) { + // Try to create directory recursively + char *s = strdup(TD_FILE_REL_NAME(pTFile)); + if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_FILE_DID(pTFile)) < 0) { + taosMemoryFreeClear(s); + return -1; + } + taosMemoryFreeClear(s); + + pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pTFile->pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } else { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + if (!updateHeader) { + return 0; + } + + pTFile->info.fsize += TD_FILE_HEAD_SIZE; + pTFile->info.fver = 0; + + if (tdUpdateTFileHeader(pTFile) < 0) { + tdCloseTFile(pTFile); + tdRemoveTFile(pTFile); + return -1; + } + + return 0; +} + +int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); } \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 046ee6487830969de92dc6c629ccedab3ccea403..cca3a5858814e54d32fb8965326b3786e8f3f642 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -227,19 +227,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); } - SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); - if (pHeadWithCkSum == NULL) { - return -1; - } - - walSetReaderCapacity(pHandle->pWalReader, 2048); - SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); + if (rsp.blockData == NULL || rsp.blockDataLen == NULL) { + return -1; + } + rsp.withTbName = pReq->withTbName; if (rsp.withTbName) { rsp.blockTbName = taosArrayInit(0, sizeof(void*)); @@ -253,6 +250,32 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { rsp.blockSchema = taosArrayInit(0, sizeof(void*)); } +#if 1 + if (pReq->useSnapshot) { + // TODO set ver into snapshot + int64_t lastVer = walGetCommittedVer(pTq->pWal); + if (rsp.reqOffset < lastVer) { + tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer); + tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId); + + if (rsp.blockNum != 0) { + rsp.withTbName = false; + rsp.rspOffset = lastVer; + tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset); + fetchOffset = lastVer; + goto SEND_RSP; + } + } + } +#endif + + SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); + if (pHeadWithCkSum == NULL) { + return -1; + } + + walSetReaderCapacity(pHandle->pWalReader, 2048); + while (1) { consumerEpoch = atomic_load_32(&pHandle->epoch); if (consumerEpoch > reqEpoch) { @@ -283,7 +306,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE || pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE || pHead->msgType == TDMT_VND_DROP_TTL_TABLE); - // return + tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); SMqMetaRsp metaRsp = {0}; metaRsp.reqOffset = pReq->currentOffset; metaRsp.rspOffset = fetchOffset; @@ -292,6 +315,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { metaRsp.metaRsp = pHead->body; if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { code = -1; + goto OVER; } code = 0; goto OVER; @@ -308,6 +332,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosMemoryFree(pHeadWithCkSum); +SEND_RSP: ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); if (rsp.withSchema) { @@ -364,6 +389,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->epoch = -1; pHandle->execHandle.subType = req.subType; + pHandle->fetchMeta = req.withMeta; pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); for (int32_t i = 0; i < 5; i++) { @@ -376,6 +402,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { SReadHandle handle = { .reader = pHandle->execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, + .vnode = pTq->pVnode, + .initTsdbReader = 1, }; pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); @@ -448,6 +476,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { .reader = pStreamReader, .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, + .initTsdbReader = 1, }; /*pTask->exec.inputHandle = pStreamReader;*/ pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 7c75d88a83d5813ceb5c34eb6f1e0cb779335d5f..e6f960c8c12817a5b36a7b4995c0a534736c5535 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -60,6 +60,30 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqD return 0; } +int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) { + ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); + qTaskInfo_t task = pExec->execCol.task[workerId]; + if (qStreamScanSnapshot(task) < 0) { + ASSERT(0); + } + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(0); + } + if (pDataBlock == NULL) break; + + ASSERT(pDataBlock->info.rows != 0); + ASSERT(pDataBlock->info.numOfCols != 0); + + tqAddBlockDataToRsp(pDataBlock, pRsp); + pRsp->blockNum++; + } + + return 0; +} + int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) { if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { qTaskInfo_t task = pExec->execCol.task[workerId]; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 286bcea820b61ae146dca47a1512928be3f331dd..91931c2fd81e4895eac2c9f134eb44efb648edb6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo { SReadHandle readHandle; uint64_t tableUid; // queried super table uid EStreamScanMode scanMode; - SOperatorInfo* pOperatorDumy; + SOperatorInfo* pSnapshotReadOp; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SArray* childIds; SessionWindowSupporter sessionSup; diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 33b7811e6c0c26032d1bca8cc8643dce2b0ea984..9db2783ccdc6999ec484a01a91d9111b02ffce18 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -86,7 +86,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); SDeleterRes* pRes = (SDeleterRes*)pEntry->data; - pRes->uid = pHandle->pDeleter->tableId; + pRes->suid = pHandle->pParam->suid; pRes->uidList = pHandle->pParam->pUidList; pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 0282d9cccb55bf252c412ddb83b16d77038f1741..ac38fd8de936e67b487cf789734b9c843542e31d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -300,6 +300,8 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo uint64_t tableUid = pScanNode->uid; + pListInfo->suid = pScanNode->suid; + SNode* pTagCond = (SNode*)pListInfo->pTagCond; SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond; if (pScanNode->tableType == TSDB_SUPER_TABLE) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6b93119987c8e608389b6fa7d84d9966c4f66dd4..1bd1ce14b1d3f6102f3a909ba53462a1f6e5e538 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -67,6 +67,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); } + } else if (type == STREAM_DATA_TYPE_FROM_SNAPSHOT) { + // do nothing + ASSERT(pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT); } else { ASSERT(0); } @@ -75,6 +78,14 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } +int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { + if (tinfo == NULL) { + return TSDB_CODE_QRY_APP_ERROR; + } + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL); +} + int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { return qSetMultiStreamInput(tinfo, input, 1, type, assignUid); } @@ -106,14 +117,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { return NULL; } - // print those info into log -#if 0 - pMsg->sId = pMsg->sId; - pMsg->queryId = pMsg->queryId; - pMsg->taskId = pMsg->taskId; - pMsg->contentLen = pMsg->contentLen; -#endif - /*qDebugL("stream task string %s", (const char*)msg);*/ struct SSubplan* plan = NULL; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 30ed44b872f871b63a3326d44ce9c41153232624..a3862bec39bdafdf82aadfc94162aefe01df6d33 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4041,16 +4041,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo tsdbReaderT pDataReader = NULL; if (pHandle) { - if (pHandle->vnode) { - // for stram + if (pHandle->initTsdbReader) { + // for stream + ASSERT(pHandle->vnode); pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId); } else { // for tq + ASSERT(pHandle->meta); getTableList(pHandle->meta, pScanPhyNode, pTableListInfo); } } - if (pDataReader == NULL && terrno != 0) { qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo)); // return NULL; @@ -4464,6 +4465,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT return TSDB_CODE_OUT_OF_MEMORY; } int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList); + pDeleterParam->suid = pTask->tableqinfoList.suid; pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); if (NULL == pDeleterParam->pUidList) { taosMemoryFree(pDeleterParam); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index df9dfd48cdfc5660c4b27078ee1d0d4275a92ec1..80dd01cf4c95c4b48055d59722bf388a6c08ca59 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_ } for (int32_t i = 0; i < pBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i, data, (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + colDataAppend(pColInfoData, i, data, + (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); } if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && @@ -776,7 +777,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { if (!needRead) { return false; } - STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; + STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->curTWinIdx = 0; tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); @@ -821,11 +822,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { while (1) { SSDataBlock* pResult = NULL; - pResult = doTableScan(pInfo->pOperatorDumy); + pResult = doTableScan(pInfo->pSnapshotReadOp); if (pResult == NULL) { if (prepareDataScan(pInfo)) { // scan next window data - pResult = doTableScan(pInfo->pOperatorDumy); + pResult = doTableScan(pInfo->pSnapshotReadOp); } } if (!pResult) { @@ -860,11 +861,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa ASSERT(pBlock->info.numOfCols == pUpdateBlock->info.numOfCols); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); - pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); + pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId); int32_t i = 0; for (; i < size; i++) { rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); - uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); + uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId); if (pInfo->groupId != id) { break; } @@ -928,7 +929,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); blockDataUpdateTsWindow(pBlock, 0); return pBlock; - } else { + } else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { blockDataDestroy(pInfo->pUpdateRes); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; @@ -1062,6 +1063,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; + } else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) { + SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); + if (pResult) { + return pResult->info.rows > 0 ? pResult : NULL; + } + return NULL; + } else { + ASSERT(0); + return NULL; } } @@ -1126,7 +1136,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan } else { pInfo->pUpdateInfo = NULL; } - pInfo->pOperatorDumy = pTableScanDummy; + pInfo->pSnapshotReadOp = pTableScanDummy; pInfo->interval = pSTInfo->interval; pInfo->readHandle = *pHandle; @@ -1548,7 +1558,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { return NULL; } - int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE : TDMT_MND_SYSTABLE_RETRIEVE; + int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE + : TDMT_MND_SYSTABLE_RETRIEVE; pMsgSendInfo->param = pOperator; pMsgSendInfo->msgInfo.pData = buf1; @@ -1834,7 +1845,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { } else { data = (char*)p; } - colDataAppend(pDst, count, data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + colDataAppend(pDst, count, data, + (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && data != NULL) { @@ -1887,11 +1899,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi goto _error; } - pInfo->pTableList = pTableListInfo; - pInfo->pColMatchInfo = colList; - pInfo->pRes = createResDataBlock(pDescNode); - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; + pInfo->pTableList = pTableListInfo; + pInfo->pColMatchInfo = colList; + pInfo->pRes = createResDataBlock(pDescNode); + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 19f0fd4ea77ab9b4078d0f70365f19235f45d5b4..6813c6354244503b376c8c13f76307f834360f91 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2238,7 +2238,8 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) { } void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { - ASSERT(pDest->info.capacity >= pSource->info.rows); + // ASSERT(pDest->info.capacity >= pSource->info.rows); + blockDataEnsureCapacity(pDest, pSource->info.rows); clearUpdateDataBlock(pDest); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 28181d7e11291a984ae53b0bff2bc46282977548..4250785f7cca6a041776d55a992276fc316d74b6 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -271,7 +271,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen SDeleterRes* pDelRes = (SDeleterRes*)output.pData; rsp.affectedRows = pDelRes->affectedRows; - pRes->uid = pDelRes->uid; + pRes->suid = pDelRes->suid; pRes->uidList = pDelRes->uidList; pRes->skey = pDelRes->skey; pRes->ekey = pDelRes->ekey; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 69cb5b43faee6ea22acc0eff0007075794681417..6fada59c84e38392db20e6862bb847468de46eb2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -80,7 +80,6 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { } qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - /*qRes->sourceVg = pTask->nodeId;*/ if (streamTaskOutput(pTask, qRes) < 0) { streamQueueProcessFail(pTask->inputQueue); taosArrayDestroy(pRes); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b89e908df7d4907f9412b06d99d17216742266dd..fa10fc26dd7cf3817e70cb00fb76ceb4522dadf3 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") @@ -356,7 +357,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, "No table data in memo TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, "File already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, "Need to reconfigure table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information to create table") -TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "TSDB no available disk") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 76b4f6b153a64d7ccf396f6f826198cef4393f5f..cc28b19de9816b751d99e63d4b8b046b3be9814a 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -109,6 +109,7 @@ ./test.sh -f tsim/tmq/basic4Of2Cons.sim ./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim ./test.sh -f tsim/tmq/topic.sim +./test.sh -f tsim/tmq/snapshot.sim # --- stable ./test.sh -f tsim/stable/disk.sim diff --git a/tests/script/tsim/mnode/basic5.sim b/tests/script/tsim/mnode/basic5.sim index 23f5f6d7826661c076f56edf874f1cbc0cc20d42..e298dcbe55f3581b96098969c34c0a1f69149b10 100644 --- a/tests/script/tsim/mnode/basic5.sim +++ b/tests/script/tsim/mnode/basic5.sim @@ -233,6 +233,7 @@ if $data(1)[3] != dropping then endi print =============== step9: start mnode1 and wait it dropped +sleep 3000 system sh/exec.sh -n dnode1 -s start $x = 0 diff --git a/tests/script/tsim/show/basic.sim b/tests/script/tsim/show/basic.sim index 8f9362fc6968bf98f03e6c58e84944a10fc931de..4d646f39e38dca6844def98eb0b960f8309a2965 100644 --- a/tests/script/tsim/show/basic.sim +++ b/tests/script/tsim/show/basic.sim @@ -233,12 +233,12 @@ if $rows != 4 then endi sql show dnode 1 variables; -if $rows != 114 then +if $rows <= 0 then return -1 endi sql show local variables; -if $rows != 50 then +if $rows <= 0 then return -1 endi diff --git a/tests/script/tsim/tmq/consume.sh b/tests/script/tsim/tmq/consume.sh index 3fa71d6edd96b6c664d5c680fb2615c730e14f8e..001ce6ae496cc7e5f7b4c5a3bf5b92b0c7cf12b3 100755 --- a/tests/script/tsim/tmq/consume.sh +++ b/tests/script/tsim/tmq/consume.sh @@ -17,8 +17,9 @@ VALGRIND=0 SIGNAL=SIGINT SHOW_MSG=0 SHOW_ROW=0 +EXP_USE_SNAPSHOT=0 -while getopts "d:s:v:y:x:g:r:w:" arg +while getopts "d:s:v:y:x:g:r:w:e:" arg do case $arg in d) @@ -45,6 +46,9 @@ do w) CDB_NAME=$OPTARG ;; + e) + EXP_USE_SNAPSHOT=$OPTARG + ;; ?) echo "unkown argument" ;; @@ -91,8 +95,8 @@ if [ "$EXEC_OPTON" = "start" ]; then echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 & nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 & else - echo "nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME > /dev/null 2>&1 &" - nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME > /dev/null 2>&1 & + echo "nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME -e $EXP_USE_SNAPSHOT > /dev/null 2>&1 &" + nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME -e $EXP_USE_SNAPSHOT > /dev/null 2>&1 & fi else PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'` diff --git a/tests/script/tsim/tmq/snapshot.sim b/tests/script/tsim/tmq/snapshot.sim new file mode 100644 index 0000000000000000000000000000000000000000..5683aaa55987a07ba09a3fc88b9da613792c3321 --- /dev/null +++ b/tests/script/tsim/tmq/snapshot.sim @@ -0,0 +1,289 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics +#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics +#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics +#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics + +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# + +run tsim/tmq/prepareBasicEnv-1vgrp.sim + +#---- global parameters start ----# +$dbName = db +$vgroups = 1 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +$pullDelay = 3 +$ifcheckdata = 1 +$ifmanualcommit = 1 +$showMsg = 1 +$showRow = 0 + +sql connect +sql use $dbName + +print == create topics from super table +sql create topic topic_stb_column as select ts, c3 from stb +sql create topic topic_stb_all as select ts, c1, c2, c3 from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +print == create topics from child table +sql create topic topic_ctb_column as select ts, c3 from ctb0 +sql create topic topic_ctb_all as select * from ctb0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 + +print == create topics from normal table +sql create topic topic_ntb_column as select ts, c3 from ntb0 +sql create topic topic_ntb_all as select * from ntb0 +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 + +#sql show topics +#if $rows != 9 then +# return -1 +#endi + +#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest' +$keyList = ' . group.id:cgrp1 +$keyList = $keyList . , +$keyList = $keyList . enable.auto.commit:false +#$keyList = $keyList . , +#$keyList = $keyList . auto.commit.interval.ms:6000 +#$keyList = $keyList . , +#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . ' +print ========== key list: $keyList + + +$cdb_index = 0 +#=============================== start consume =============================# + +print ================ test consume from stb +$loop_cnt = 0 +loop_consume_diff_topic_from_stb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_stb_column + $topicList = ' . topic_stb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_stb_all + $topicList = ' . topic_stb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_stb_function + $topicList = ' . topic_stb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_stb_end +endi + +$consumerId = 0 +$totalMsgOfStb = $ctbNum * $rowsPerCtb +$expectmsgcnt = 1 +$expectrowcnt = 100 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -e 1 -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -e 1 -s start + +print == check consume result +wait_consumer_end_from_stb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +if $rows != 1 then + sleep 1000 + goto wait_consumer_end_from_stb +endi +if $data[0][1] != $consumerId then + return -1 +endi +if $data[0][2] != $expectmsgcnt then + return -1 +endi +if $data[0][3] != $expectrowcnt then + return -1 +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_stb +loop_consume_diff_topic_from_stb_end: + +print ================ test consume from ctb +$loop_cnt = 0 +loop_consume_diff_topic_from_ctb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_ctb_column + $topicList = ' . topic_ctb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_ctb_all + $topicList = ' . topic_ctb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_ctb_function + $topicList = ' . topic_ctb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_ctb_end +endi + +$consumerId = 0 +$totalMsgOfCtb = $rowsPerCtb +$expectmsgcnt = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + +print == start consumer to pull msgs from ctb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1 +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1 + +print == check consume result +wait_consumer_end_from_ctb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +if $rows != 1 then + sleep 1000 + goto wait_consumer_end_from_ctb +endi +if $data[0][1] != $consumerId then + return -1 +endi +if $data[0][2] != 1 then + return -1 +endi +if $data[0][3] != 10 then + return -1 +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_ctb +loop_consume_diff_topic_from_ctb_end: + +print ================ test consume from ntb +$loop_cnt = 0 +loop_consume_diff_topic_from_ntb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_ntb_column + $topicList = ' . topic_ntb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_ntb_all + $topicList = ' . topic_ntb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_ntb_function + $topicList = ' . topic_ntb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_ntb_end +endi + +$consumerId = 0 +$totalMsgOfNtb = $rowsPerCtb +$expectmsgcnt = $totalMsgOfNtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) + +print == start consumer to pull msgs from ntb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1 +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1 + +print == check consume result from ntb +wait_consumer_end_from_ntb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +if $rows != 1 then + sleep 1000 + goto wait_consumer_end_from_ntb +endi +if $data[0][1] != $consumerId then + return -1 +endi +if $data[0][2] != 1 then + return -1 +endi +if $data[0][3] != $totalMsgOfNtb then + return -1 +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_ntb +loop_consume_diff_topic_from_ntb_end: + +#------ not need stop consumer, because it exit after pull msg overthan expect msg +#system tsim/tmq/consume.sh -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/simpletest.bat b/tests/system-test/simpletest.bat index 74f2cdfcbaea4dd1d6084cfc198dfe3bf3ce268f..0b1f16d2164e7509efaa0b82dbdeb38c2859af2f 100644 --- a/tests/system-test/simpletest.bat +++ b/tests/system-test/simpletest.bat @@ -4,7 +4,7 @@ python3 .\test.py -f 0-others\taosShellNetChk.py python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\taosdMonitor.py -@REM python3 .\test.py -f 0-others\udfTest.py +python3 .\test.py -f 0-others\udfTest.py @REM python3 .\test.py -f 0-others\udf_create.py @REM python3 .\test.py -f 0-others\udf_restart_taosd.py @REM python3 .\test.py -f 0-others\cachelast.py diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index c28c676a87a25eb34b33e995792d8377ee0f14c2..053cd79432319dc35dbd50bb088d0d918b1e2099 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -22,9 +22,9 @@ #include #include "taos.h" +#include "taosdef.h" #include "taoserror.h" #include "tlog.h" -#include "taosdef.h" #include "types.h" #define GREEN "\033[1;32m" @@ -36,11 +36,7 @@ #define MAX_CONSUMER_THREAD_CNT (16) #define MAX_VGROUP_CNT (32) -typedef enum { - NOTIFY_CMD_START_CONSUM, - NOTIFY_CMD_START_COMMIT, - NOTIFY_CMD_ID_BUTT -}NOTIFY_CMD_ID; +typedef enum { NOTIFY_CMD_START_CONSUM, NOTIFY_CMD_START_COMMIT, NOTIFY_CMD_ID_BUTT } NOTIFY_CMD_ID; typedef struct { TdThread thread; @@ -52,8 +48,8 @@ typedef struct { // char autoOffsetRest[16]; // none, earliest, latest TdFilePtr pConsumeRowsFile; - int32_t ifCheckData; - int64_t expectMsgCnt; + int32_t ifCheckData; + int64_t expectMsgCnt; int64_t consumeMsgCnt; int64_t consumeRowCnt; @@ -89,6 +85,7 @@ typedef struct { int32_t saveRowFlag; int32_t consumeDelay; // unit s int32_t numOfThread; + int32_t useSnapshot; SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; } SConfInfo; @@ -96,6 +93,8 @@ static SConfInfo g_stConfInfo; TdFilePtr g_fp = NULL; static int running = 1; +int8_t useSnapshot = 0; + // char* g_pRowValue = NULL; // TdFilePtr g_fp = NULL; @@ -205,6 +204,8 @@ void parseArgument(int32_t argc, char* argv[]) { g_stConfInfo.saveRowFlag = atol(argv[++i]); } else if (strcmp(argv[i], "-y") == 0) { g_stConfInfo.consumeDelay = atol(argv[++i]); + } else if (strcmp(argv[i], "-e") == 0) { + useSnapshot = (int8_t)atol(argv[++i]); } else { pError("%s unknow para: %s %s", GREEN, argv[++i], NC); exit(-1); @@ -298,11 +299,11 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { return 0; } -static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) { - //if (shell.args.is_raw_time) { - // sprintf(buf, "%" PRId64, val); - // return buf; - //} +static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) { + // if (shell.args.is_raw_time) { + // sprintf(buf, "%" PRId64, val); + // return buf; + // } time_t tt; int32_t ms = 0; @@ -340,7 +341,7 @@ static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) { } } - struct tm *ptm = taosLocalTime(&tt, NULL); + struct tm* ptm = taosLocalTime(&tt, NULL); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); if (precision == TSDB_TIME_PRECISION_NANO) { @@ -354,7 +355,8 @@ static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) { return buf; } -static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision) { +static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length, + int32_t precision) { if (val == NULL) { taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR); return; @@ -364,31 +366,31 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f char buf[TSDB_MAX_BYTES_PER_ROW]; switch (field->type) { case TSDB_DATA_TYPE_BOOL: - taosFprintfFile(pFile, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0)); + taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0)); break; case TSDB_DATA_TYPE_TINYINT: - taosFprintfFile(pFile, "%d", *((int8_t *)val)); + taosFprintfFile(pFile, "%d", *((int8_t*)val)); break; case TSDB_DATA_TYPE_UTINYINT: - taosFprintfFile(pFile, "%u", *((uint8_t *)val)); + taosFprintfFile(pFile, "%u", *((uint8_t*)val)); break; case TSDB_DATA_TYPE_SMALLINT: - taosFprintfFile(pFile, "%d", *((int16_t *)val)); + taosFprintfFile(pFile, "%d", *((int16_t*)val)); break; case TSDB_DATA_TYPE_USMALLINT: - taosFprintfFile(pFile, "%u", *((uint16_t *)val)); + taosFprintfFile(pFile, "%u", *((uint16_t*)val)); break; case TSDB_DATA_TYPE_INT: - taosFprintfFile(pFile, "%d", *((int32_t *)val)); + taosFprintfFile(pFile, "%d", *((int32_t*)val)); break; case TSDB_DATA_TYPE_UINT: - taosFprintfFile(pFile, "%u", *((uint32_t *)val)); + taosFprintfFile(pFile, "%u", *((uint32_t*)val)); break; case TSDB_DATA_TYPE_BIGINT: - taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val)); + taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val)); break; case TSDB_DATA_TYPE_UBIGINT: - taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val)); + taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val)); break; case TSDB_DATA_TYPE_FLOAT: taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val)); @@ -409,7 +411,7 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f taosFprintfFile(pFile, "\'%s\'", buf); break; case TSDB_DATA_TYPE_TIMESTAMP: - shellFormatTimestamp(buf, *(int64_t *)val, precision); + shellFormatTimestamp(buf, *(int64_t*)val, precision); taosFprintfFile(pFile, "'%s'", buf); break; default: @@ -417,12 +419,13 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f } } -static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields, int32_t precision) { +static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields, + int32_t precision) { for (int32_t i = 0; i < num_fields; i++) { if (i > 0) { taosFprintfFile(pFile, "\n"); } - shellDumpFieldToFile(pFile, (const char *)row[i], fields + i, length[i], precision); + shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision); } taosFprintfFile(pFile, "\n"); } @@ -432,31 +435,32 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) int32_t totalRows = 0; // printf("topic: %s\n", tmq_get_topic_name(msg)); - int32_t vgroupId = tmq_get_vgroup_id(msg); - const char* dbName = tmq_get_db_name(msg); + int32_t vgroupId = tmq_get_vgroup_id(msg); + const char* dbName = tmq_get_db_name(msg); taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex); - taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", tmq_get_topic_name(msg), vgroupId); + taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", + tmq_get_topic_name(msg), vgroupId); while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; - TAOS_FIELD* fields = taos_fetch_fields(msg); + TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); - int32_t* length = taos_fetch_lengths(msg); - int32_t precision = taos_result_precision(msg); - const char* tbName = tmq_get_table_name(msg); + int32_t* length = taos_fetch_lengths(msg); + int32_t precision = taos_result_precision(msg); + const char* tbName = tmq_get_table_name(msg); - dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision); + dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision); taos_print_row(buf, row, fields, numOfFields); if (0 != g_stConfInfo.showRowFlag) { taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf); - //if (0 != g_stConfInfo.saveRowFlag) { - // saveConsumeContentToTbl(pInfo, buf); - //} + // if (0 != g_stConfInfo.saveRowFlag) { + // saveConsumeContentToTbl(pInfo, buf); + // } } totalRows++; @@ -479,8 +483,7 @@ int queryDB(TAOS* taos, char* command) { return 0; } -static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) { -} +static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {} int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { char sqlStr[1024] = {0}; @@ -488,11 +491,8 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { int64_t now = taosGetTimestampMs(); // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int - sprintf(sqlStr, "insert into %s.notifyinfo values (%"PRId64", %d, %d)", - g_stConfInfo.cdbName, - now, - cmdId, - pInfo->consumerId); + sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, now, cmdId, + pInfo->consumerId); taos_query_a(pInfo->taos, sqlStr, appNothing, NULL); @@ -502,12 +502,12 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { } static int32_t g_once_commit_flag = 0; -static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - pError("tmq_commit_cb_print() commit %d\n", code); +static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + pError("tmq_commit_cb_print() commit %d\n", code); if (0 == g_once_commit_flag) { g_once_commit_flag = 1; - notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); + notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } taosFprintfFile(g_fp, "tmq_commit_cb_print() be called\n"); } @@ -541,6 +541,10 @@ void build_consumer(SThreadInfo* pInfo) { // tmq_conf_set(conf, "auto.offset.reset", "none"); // tmq_conf_set(conf, "auto.offset.reset", "earliest"); // tmq_conf_set(conf, "auto.offset.reset", "latest"); + // + if (useSnapshot) { + tmq_conf_set(conf, "experiment.use.snapshot", "true"); + } pInfo->tmq = tmq_consumer_new(conf, NULL, 0); @@ -595,12 +599,13 @@ void loop_consume(SThreadInfo* pInfo) { pInfo->consumerId); pInfo->ts = taosGetTimestampMs(); - + if (pInfo->ifCheckData) { - char filename[256] = {0}; + char filename[256] = {0}; char tmpString[128]; - //sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, getCurrentTimeString(tmpString)); - sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId); + // sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, + // getCurrentTimeString(tmpString)); + sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId); pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (pInfo->pConsumeRowsFile == NULL) { taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString)); @@ -619,10 +624,10 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; - if (0 == once_flag) { + if (0 == once_flag) { once_flag = 1; - notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); - } + notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); + } if (totalRows >= pInfo->expectMsgCnt) { char tmpString[128]; @@ -651,7 +656,7 @@ void* consumeThreadFunc(void* param) { pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); if (pInfo->taos == NULL) { taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n"); - exit(-1); + exit(-1); } build_consumer(pInfo);