diff --git a/CMakeLists.txt b/CMakeLists.txt index 90e841d5e04fd72338f38ca11f1dd5a522b61918..566d4ad29d7dff9c43ab61cac03da9a4dfdef63a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,11 +17,12 @@ include(${TD_SUPPORT_DIR}/cmake.platform) include(${TD_SUPPORT_DIR}/cmake.define) include(${TD_SUPPORT_DIR}/cmake.options) include(${TD_SUPPORT_DIR}/cmake.version) -include(${TD_SUPPORT_DIR}/cmake.install) # contrib add_subdirectory(contrib) +set_property(GLOBAL PROPERTY GLOBAL_DEPENDS_NO_CYCLES OFF) + # api add_library(api INTERFACE) target_include_directories(api INTERFACE "include/client") @@ -36,8 +37,7 @@ add_subdirectory(source) add_subdirectory(tools) add_subdirectory(utils) add_subdirectory(examples/c) +include(${TD_SUPPORT_DIR}/cmake.install) # docs -add_subdirectory(docs/doxgen) - -# tests (TODO) +add_subdirectory(docs/doxgen) \ No newline at end of file diff --git a/docs/en/14-reference/05-taosbenchmark.md b/docs/en/14-reference/05-taosbenchmark.md index bde5e3303491ee0b748be2372d7fa18b94f6b971..d95086d4c498bf61564eb953ea54da93a8bac404 100644 --- a/docs/en/14-reference/05-taosbenchmark.md +++ b/docs/en/14-reference/05-taosbenchmark.md @@ -5,7 +5,7 @@ toc_max_heading_level: 4 description: "taosBenchmark (once called taosdemo ) is a tool for testing the performance of TDengine." --- -## Introduction +# Introduction taosBenchmark (formerly taosdemo ) is a tool for testing the performance of TDengine products. taosBenchmark can test the performance of TDengine's insert, query, and subscription functions and simulate large amounts of data generated by many devices. taosBenchmark can be configured to generate user defined databases, supertables, subtables, and the time series data to populate these for performance benchmarking. taosBenchmark is highly configurable and some of the configurations include the time interval for inserting data, the number of working threads and the capability to insert disordered data. The installer provides taosdemo as a soft link to taosBenchmark for compatibility with past users. @@ -334,9 +334,9 @@ The configuration parameters for specifying super table tag columns and data col - **name** : The name of the column, if used together with count, e.g. "name": "current", "count":3, then the names of the 3 columns are current, current_2. current_3. -- **min**: The minimum value of the column/label of the data type. +- **min**: The minimum value of the column/label of the data type. The generated value will equal or large than the minimum value. -- **max**: The maximum value of the column/label of the data type. +- **max**: The maximum value of the column/label of the data type. The generated value will less than the maxium value. - **values**: The value field of the nchar/binary column/label, which will be chosen randomly from the values. diff --git a/docs/zh/14-reference/05-taosbenchmark.md b/docs/zh/14-reference/05-taosbenchmark.md index 6a6d9e3878638357131985757418c3c2aa5cd8ef..9f4f728f78438bab299360717ba467a7fcea47ca 100644 --- a/docs/zh/14-reference/05-taosbenchmark.md +++ b/docs/zh/14-reference/05-taosbenchmark.md @@ -334,9 +334,9 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\) - **name** : 列的名字,若与 count 同时使用,比如 "name":"current", "count":3, 则 3 个列的名字分别为 current, current_2. current_3。 -- **min** : 数据类型的 列/标签 的最小值。 +- **min** : 数据类型的 列/标签 的最小值。生成的值将大于或等于最小值。 -- **max** : 数据类型的 列/标签 的最大值。 +- **max** : 数据类型的 列/标签 的最大值。生成的值将小于最小值。 - **values** : nchar/binary 列/标签的值域,将从值中随机选择。 diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 1c9d11b755f77bf259e45d77c6e5983c3747835a..46c8297fbac8af6ec7fc3558d690313231569711 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -49,28 +49,28 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int, j varchar(20)) tags(a varchar(20))"); if (taos_errno(pRes) != 0) { printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); - pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)"); + pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags('c1')"); if (taos_errno(pRes) != 0) { printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); - pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); + pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags('c2')"); if (taos_errno(pRes) != 0) { printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); - pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags(3)"); + pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags('c3')"); if (taos_errno(pRes) != 0) { printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes)); return -1; @@ -96,7 +96,8 @@ int32_t create_stream() { taos_free_result(pRes); pRes = taos_query(pConn, - "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)"); + /*"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");*/ + "create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix_'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index c853828bf26ed6420cb795fa5da4f0d6beceeaa3..7ffa3cef9992d998f36f34392c0f45d1fdc9d955 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -117,6 +117,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; mndTransDrop(pTrans); + tDeleteSMqConsumerObj(pConsumerNew); return 0; FAIL: tDeleteSMqConsumerObj(pConsumerNew); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 826e1d2fd0d8d1bd5936f8a22a1c2e195e29dcf2..09eef7b4d63ce81aedf946d64de28519b843d3a8 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -239,7 +239,9 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes; } } - addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); + if (pObj->pDnode != NULL) { + addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); + } sdbRelease(pSdb, pObj); } diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 2ada3e00bb4e9c9f216cef86d373e03b0fba904a..8d779f00217a7070873633249081d75f68f643ab 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -182,6 +182,11 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "commit-offset"); + if (pTrans == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tDecoderClear(&decoder); + return -1; + } for (int32_t i = 0; i < commitOffsetReq.num; i++) { SMqOffset *pOffset = &commitOffsetReq.offsets[i]; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 5d3a2be79aa8c9a1669d10390e03cd102a89769b..ae259b95be220a3c1cb0d3c2178b92e322d65ace 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -222,6 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { } SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER); if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { + taosMemoryFree(buf); goto TOPIC_DECODE_OVER; } taosMemoryFree(buf); diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index fb450f359429de2adac26b614712df45dbf124d5..515fd31e9d0930296c04a29f2945d7923ea542e2 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -35,7 +35,11 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { *ppMeta = NULL; // create handle - slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_META_DIR) + 3; + if (pVnode->pTfs) { + slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_META_DIR) + 3; + } else { + slen = strlen(pVnode->path) + strlen(VNODE_META_DIR) + 2; + } if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + slen)) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -43,8 +47,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { metaInitLock(pMeta); pMeta->path = (char *)&pMeta[1]; - sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, - VNODE_META_DIR); + if (pVnode->pTfs) { + sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, + VNODE_META_DIR); + } else { + sprintf(pMeta->path, "%s%s%s", pVnode->path, TD_DIRSEP, VNODE_META_DIR); + } taosRealPath(pMeta->path, NULL, slen); pMeta->pVnode = pVnode; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 1a57a391b13d6c51abb95fb48f8e0e7bef0d52b8..1c5eee73785841eb23debb0909ebd8fd1b12ac2e 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -170,6 +170,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tdbFree(pKey); tdbTbcClose(pCur); return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 10926ae6ad31ae8d609dd20819ad03bec2e26c57..85514ed5b607814ef441ad239724e045002c44b8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -417,6 +417,7 @@ _err: // EXPOSED APIS ==================================================================================== int32_t tsdbFSOpen(STsdb *pTsdb) { int32_t code = 0; + SVnode *pVnode = pTsdb->pVnode; // open handle pTsdb->fs.pDelFile = NULL; @@ -429,8 +430,12 @@ int32_t tsdbFSOpen(STsdb *pTsdb) { // load fs or keep empty char fname[TSDB_FILENAME_LEN]; - snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, - pTsdb->path, TD_DIRSEP); + if (pVnode->pTfs) { + snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + } else { + snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); + } if (!taosCheckExistFile(fname)) { // empty one diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index fcbcff924879995639f1a07a2f267506ca72a4af..e4080ccf1e36bd04032d0c16fe11b094c3b69d31 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -57,10 +57,13 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee } else { memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg)); } - // pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb)); // create dir - tfsMkdir(pVnode->pTfs, pTsdb->path); + if (pVnode->pTfs) { + tfsMkdir(pVnode->pTfs, pTsdb->path); + } else { + taosMkDir(pTsdb->path); + } // open tsdb if (tsdbFSOpen(pTsdb) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 6dc3ef86a724b9d16ae5205ff99f414d21adf35c..07d9b9626137bf7a80f049367fa9f694cbf127fd 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -227,7 +227,11 @@ int vnodeCommit(SVnode *pVnode) { info.state.committed = pVnode->state.applied; info.state.commitTerm = pVnode->state.applyTerm; info.state.commitID = pVnode->state.commitID; - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + if (pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); + } if (vnodeSaveInfo(dir, &info) < 0) { ASSERT(0); return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 414834e2eb12fb4d383f6d78f5b9b08b07ddaf55..de3af7cde6933d1bce189f3b3c8ba81b52293056 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -138,12 +138,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR); taosRealPath(tdir, NULL, sizeof(tdir)); -// for test tsdb snapshot -#if 0 - pVnode->config.walCfg.segSize = 200; - pVnode->config.walCfg.retentionSize = 2000; -#endif - pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); if (pVnode->pWal == NULL) { vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -159,12 +153,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } +#if !VNODE_AS_LIB // open query if (vnodeQueryOpen(pVnode)) { vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } +#endif // vnode begin if (vnodeBegin(pVnode) < 0) { @@ -173,11 +169,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } +#if !VNODE_AS_LIB // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } +#endif return pVnode; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2482ca1a5a5a0bd8d1075508f09d32f4eb580362..131327976f06d7a610a2319b1f4a40e2c097c51f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -819,11 +819,12 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq int32_t tsize, ret; SEncoder encoder = {0}; SArray *newTbUids = NULL; + SVStatis statis = {0}; terrno = TSDB_CODE_SUCCESS; pRsp->code = 0; pSubmitReq->version = version; - atomic_fetch_add_64(&pVnode->statis.nBatchInsert, 1); + statis.nBatchInsert = 1; #ifdef TD_DEBUG_PRINT_ROW vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__); @@ -943,18 +944,21 @@ _exit: taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); - atomic_fetch_add_64(&pVnode->statis.nInsert, submitRsp.numOfRows); - atomic_fetch_add_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows); - // TODO: the partial success scenario and the error case // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level // 1/level 2. // TODO: refactor if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { - atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1); + statis.nBatchInsertSuccess = 1; tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); } + // N.B. not strict as the following procedure is not atomic + atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows); + atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows); + atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert); + atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess); + vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f773de52802148453682e72b294d7194e12bbf1b..353c3874c00173ad1628cdf28747e08382cc8152 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -474,6 +474,10 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + if (msgcb == NULL) { + return -1; + } + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index ade2487239124d9878e7c1d954ecbfb7f0d5da76..72459f4d35437c704f7021d14126f6166388bbe5 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -1,20 +1,21 @@ +add_subdirectory(tdb) +add_subdirectory(cache) add_subdirectory(transport) +add_subdirectory(wal) +add_subdirectory(monitor) +add_subdirectory(tfs) add_subdirectory(sync) -add_subdirectory(tdb) +add_subdirectory(qcom) +add_subdirectory(nodes) +add_subdirectory(catalog) + +add_subdirectory(scalar) +add_subdirectory(function) add_subdirectory(index) -add_subdirectory(wal) add_subdirectory(parser) add_subdirectory(scheduler) -add_subdirectory(cache) -add_subdirectory(catalog) add_subdirectory(executor) add_subdirectory(stream) add_subdirectory(planner) -add_subdirectory(function) -add_subdirectory(qcom) add_subdirectory(qworker) -add_subdirectory(tfs) -add_subdirectory(monitor) -add_subdirectory(nodes) -add_subdirectory(scalar) add_subdirectory(command) \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index df7e5ff06fbb1bbf8ed637dd9deda462d90b2424..0cfdd2b68e49d1b89c851cc87ed867698921a65c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1341,7 +1341,8 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) { void* pData = colDataGetData(pCol, 0); // TODO check tbname validation if (pData != (void*)-1 && pData != NULL) { - memcpy(pBlock->info.parTbName, varDataVal(pData), varDataLen(pData)); + memcpy(pBlock->info.parTbName, varDataVal(pData), TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN)); + pBlock->info.parTbName[TSDB_TABLE_NAME_LEN - 1] = 0; } else { pBlock->info.parTbName[0] = 0; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c968f827020999e177f2dba61886267789a3594a..560a1d319a8797e9c63049a3de9d1ceab8b5232e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3611,6 +3611,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { blockDataDestroy(pInfo->pUpdateRes); destroySqlFunctionCtx(pInfo->pDummyCtx, 0); taosHashCleanup(pInfo->pStDeleted); + taosHashCleanup(pInfo->pGroupIdTbNameMap); taosMemoryFreeClear(param); } @@ -4670,6 +4671,7 @@ void destroyStreamStateOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); taosHashCleanup(pInfo->pSeDeleted); + taosHashCleanup(pInfo->pGroupIdTbNameMap); destroySqlFunctionCtx(pInfo->pDummyCtx, 0); taosMemoryFreeClear(param); diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index dd048a047aeafdddce06dd0841939afe20bfd3d9..913dd24a491748829e19cd7ff486b3eadfa78520 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -14,7 +14,14 @@ target_include_directories( target_link_libraries( function - PRIVATE os util common nodes scalar qcom transport stream + PRIVATE os + PRIVATE util + PRIVATE common + PRIVATE nodes + PRIVATE qcom + PRIVATE scalar + PRIVATE transport + PRIVATE stream PUBLIC uv_a ) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ab337a985f6049a8e4b3cdb2cecd029b935fdb99..1c7ef05087e1e29a73ef63d1205aa02a7e8e13cd 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2914,6 +2914,7 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* } else { intervalRange = pInterval->datum.i; } + if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } @@ -3140,6 +3141,12 @@ static int32_t translateInterpEvery(STranslateContext* pCxt, SNode** pEvery) { code = translateExpr(pCxt, pEvery); } + int64_t interval = ((SValueNode*)(*pEvery))->datum.i; + if (interval == 0) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, + "Unsupported time unit in EVERY clause"); + } + return code; } diff --git a/source/libs/qcom/CMakeLists.txt b/source/libs/qcom/CMakeLists.txt index 6e7b5cb610b64f6b045684b6d4c86a7754056a59..715c3b8ef20f8aaccf1e296e561331b7e42734af 100644 --- a/source/libs/qcom/CMakeLists.txt +++ b/source/libs/qcom/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( qcom - PRIVATE os util transport nodes + PRIVATE os util transport ) if(${BUILD_TEST}) diff --git a/source/libs/scalar/CMakeLists.txt b/source/libs/scalar/CMakeLists.txt index 776abd93e8bfec132502e2e13a7fa40f5d694459..c34c5e2877951cd6ba4d3b6ae72b321bd9dc6231 100644 --- a/source/libs/scalar/CMakeLists.txt +++ b/source/libs/scalar/CMakeLists.txt @@ -8,8 +8,14 @@ target_include_directories( ) target_link_libraries(scalar - PRIVATE os util common nodes function qcom vnode - ) + PRIVATE os + PRIVATE util + PRIVATE common + PRIVATE nodes + PRIVATE function + PRIVATE qcom + PRIVATE vnode +) if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 24c5defee5c474e3434b73b5a34ca2e04294890d..85592881cd9b160077fa6d8c8ada783db6ef6cfd 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -73,7 +73,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { } // todo refactor - if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { + if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { goto _err; } diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index 62eb1a985bbb5eeddbb415b8e58e60041e95d968..a48926d2d4472f24252daf7519a69f4c0deff941 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -16,11 +16,6 @@ target_link_libraries( ) if (${BUILD_WITH_UV_TRANS}) if (${BUILD_WITH_UV}) - target_include_directories( - transport - PUBLIC "${TD_SOURCE_DIR}/contrib/libuv/include" - ) - target_link_libraries( transport PUBLIC uv_a diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 233d6a87b86fee566763fdd1c8aec9262008ab42..e49a9631911e57d7f30cc66d5b1db4036d8e3dac 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -35,8 +35,8 @@ int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVe int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; } -static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { - return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); +static FORCE_INLINE void walBuildMetaName(SWal* pWal, int metaVer, char* buf) { + sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { @@ -150,6 +150,7 @@ int walCheckAndRepairMeta(SWal* pWal) { const char* idxPattern = "^[0-9]+.idx$"; regex_t logRegPattern; regex_t idxRegPattern; + bool fixed = false; regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); @@ -206,6 +207,77 @@ int walCheckAndRepairMeta(SWal* pWal) { actualFileNum = taosArrayGetSize(pLogInfoArray); #endif + { + int32_t i = 0, j = 0; + while (i < actualFileNum && j < metaFileNum) { + SWalFileInfo* pActualFile = taosArrayGet(actualLog, i); + SWalFileInfo* pMetaFile = taosArrayGet(pWal->fileInfoSet, j); + if (pActualFile->firstVer < pMetaFile->firstVer) { + char fNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pActualFile->firstVer, fNameStr); + taosRemoveFile(fNameStr); + walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); + taosRemoveFile(fNameStr); + i++; + } else if (pActualFile->firstVer > pMetaFile->firstVer) { + taosArrayRemove(pWal->fileInfoSet, j); + metaFileNum--; + } else { + i++; + j++; + } + } + if (i == actualFileNum && j == metaFileNum) { + if (j > 0) { + SWalFileInfo* pLastInfo = taosArrayGet(pWal->fileInfoSet, j - 1); + int64_t fsize = 0; + char fNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pLastInfo->firstVer, fNameStr); + taosStatFile(fNameStr, &fsize, NULL); + if (pLastInfo->fileSize != fsize) { + fixed = true; + pLastInfo->fileSize = fsize; + pLastInfo->lastVer = walScanLogGetLastVer(pWal); + } + } + } else { + fixed = true; + while (i < actualFileNum) { + SWalFileInfo* pActualFile = taosArrayGet(actualLog, i); + char fNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pActualFile->firstVer, fNameStr); + taosStatFile(fNameStr, &pActualFile->fileSize, NULL); + + if (pActualFile->fileSize == 0) { + ASSERT(i == actualFileNum - 1); + taosRemoveFile(fNameStr); + + walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); + taosRemoveFile(fNameStr); + break; + } + + if (i < actualFileNum - 1) { + pActualFile->lastVer = ((SWalFileInfo*)taosArrayGet(actualLog, i + 1))->firstVer - 1; + taosArrayPush(pWal->fileInfoSet, pActualFile); + i++; + } else { + pActualFile = taosArrayPush(pWal->fileInfoSet, pActualFile); + pActualFile->lastVer = walScanLogGetLastVer(pWal); + if (pActualFile->lastVer == -1) { + taosRemoveFile(fNameStr); + + walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); + taosRemoveFile(fNameStr); + taosArrayPop(pWal->fileInfoSet); + } + break; + } + } + } + } + +#if 0 if (metaFileNum > actualFileNum) { taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); } else if (metaFileNum < actualFileNum) { @@ -214,32 +286,30 @@ int walCheckAndRepairMeta(SWal* pWal) { taosArrayPush(pWal->fileInfoSet, pFileInfo); } } +#endif + taosArrayDestroy(actualLog); + actualFileNum = taosArrayGetSize(pWal->fileInfoSet); pWal->writeCur = actualFileNum - 1; - if (actualFileNum > 0) { - pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; - - SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1); - char fnameStr[WAL_FILE_LEN]; - walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); - int64_t fileSize = 0; - taosStatFile(fnameStr, &fileSize, NULL); - /*ASSERT(fileSize != 0);*/ - - if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) { - pLastFileInfo->fileSize = fileSize; - pWal->vers.lastVer = walScanLogGetLastVer(pWal); - ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer; - ASSERT(pWal->vers.lastVer != -1); - int code = walSaveMeta(pWal); - if (code < 0) { - return -1; - } + if (actualFileNum > 0) { + int64_t fLastVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur))->lastVer; + if (fLastVer != -1 && pWal->vers.lastVer != fLastVer) { + fixed = true; + pWal->vers.lastVer = fLastVer; + } + int64_t fFirstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + if (fFirstVer != pWal->vers.firstVer) { + fixed = true; + pWal->vers.firstVer = fFirstVer; } } + if (fixed) { + walSaveMeta(pWal); + } + return 0; } @@ -530,6 +600,11 @@ int walLoadMeta(SWal* pWal) { // read metafile int64_t fileSize = 0; taosStatFile(fnameStr, &fileSize, NULL); + if (fileSize == 0) { + taosRemoveFile(fnameStr); + wDebug("vgId:%d wal find empty meta ver %d", pWal->cfg.vgId, metaVer); + return -1; + } int size = (int)fileSize; char* buf = taosMemoryMalloc(size + 5); if (buf == NULL) { @@ -540,6 +615,7 @@ int walLoadMeta(SWal* pWal) { TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); if (pFile == NULL) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + taosMemoryFree(buf); return -1; } if (taosReadFile(pFile, buf, size) != size) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index cc6f827b8e74fbfc1549b76461eacdc991c954a9..c4f1a81eeb10a46594efc9e2c4bfa89228b16ed6 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -279,7 +279,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } - pRead->pHead = ptr; + pRead->pHead = (SWalCkHead *)ptr; pReadHead = &pRead->pHead->head; pRead->capacity = pReadHead->bodyLen; } @@ -399,7 +399,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } - *ppHead = ptr; + *ppHead = (SWalCkHead *)ptr; pReadHead = &((*ppHead)->head); pRead->capacity = pReadHead->bodyLen; }