diff --git a/docs/en/14-reference/03-connector/06-rust.mdx b/docs/en/14-reference/03-connector/06-rust.mdx index 56f5e20cb44be5ebaa9f112c70233f1b6bd5a6be..a98683d43c169c6a2f76dc154035c0af84464287 100644 --- a/docs/en/14-reference/03-connector/06-rust.mdx +++ b/docs/en/14-reference/03-connector/06-rust.mdx @@ -31,7 +31,8 @@ Websocket connections are supported on all platforms that can run Go. | connector-rust version | TDengine version | major features | | :----------------: | :--------------: | :--------------------------------------------------: | -| v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. | +| v0.9.2 | 3.0.7.0 or later | STMT: Get tag_fields and col_fields under ws. | +| v0.8.12 | 3.0.5.0 | TMQ: Get consuming progress and seek offset to consume. | | v0.8.0 | 3.0.4.0 | Support schemaless insert. | | v0.7.6 | 3.0.3.0 | Support req_id in query. | | v0.6.0 | 3.0.0.0 | Base features. | diff --git a/docs/zh/08-connector/26-rust.mdx b/docs/zh/08-connector/26-rust.mdx index 79a6badfead70c27fc344b1e506aa8ea5afb624d..3e51aa72bb85841e219f89c1b91b4ff4e4f791cc 100644 --- a/docs/zh/08-connector/26-rust.mdx +++ b/docs/zh/08-connector/26-rust.mdx @@ -30,7 +30,8 @@ Websocket 连接支持所有能运行 Rust 的平台。 | Rust 连接器版本 | TDengine 版本 | 主要功能 | | :----------------: | :--------------: | :--------------------------------------------------: | -| v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 | +| v0.9.2 | 3.0.7.0 or later | STMT:ws 下获取 tag_fields、col_fields。 | +| v0.8.12 | 3.0.5.0 | 消息订阅:获取消费进度及按照指定进度开始消费。 | | v0.8.0 | 3.0.4.0 | 支持无模式写入。 | | v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 | | v0.6.0 | 3.0.0.0 | 基础功能。 | diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index df6cc1b5e7943205a61d9a3ff0e01400abef6002..32429ff8c6eb6ff96b9877d4d88066df7fababd1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache return code; } -/* -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { - int32_t code = 0; - SLRUCache *pCache = pTsdb->lruCache; - SArray *pCidList = pr->pCidList; - int num_keys = TARRAY_SIZE(pCidList); - - for (int i = 0; i < num_keys; ++i) { - SLastCol *pLastCol = NULL; - int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); - - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; - LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); - if (!h) { - taosThreadMutexLock(&pTsdb->lruMutex); - h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); - if (!h) { - pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype); - - size_t charge = sizeof(*pLastCol); - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { - charge += pLastCol->colVal.value.nData; - } - - LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h, - TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - } - - taosThreadMutexUnlock(&pTsdb->lruMutex); - } - - pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - - SLastCol lastCol = *pLastCol; - reallocVarData(&lastCol.colVal); - if (h) { - taosLRUCacheRelease(pCache, h, false); - } - - taosArrayPush(pLastArray, &lastCol); - } - - return code; -} -*/ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; // fetch schema @@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex); rocksMayWrite(pTsdb, true, false, false); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, @@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksdb_free(values_list[i]); rocksdb_free(values_list[i + num_keys]); - taosThreadMutexLock(&pTsdb->lruMutex); + // taosThreadMutexLock(&pTsdb->lruMutex); LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen); if (h) { @@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE } taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); - taosThreadMutexUnlock(&pTsdb->lruMutex); + // taosThreadMutexUnlock(&pTsdb->lruMutex); } for (int i = 0; i < num_keys; ++i) { taosMemoryFree(keys_list[i]); @@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksMayWrite(pTsdb, true, false, true); + taosThreadMutexUnlock(&pTsdb->lruMutex); + _exit: taosMemoryFree(pTSchema); @@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { return code; } -/* -int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { - int32_t code = 0; - char key[32] = {0}; - int keyLen = 0; - // getTableCacheKey(uid, "lr", key, &keyLen); - getTableCacheKey(uid, 0, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - bool invalidate = false; - int16_t nCol = taosArrayGetSize(pLast); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (eKey >= tTsVal->ts) { - invalidate = true; - break; - } - } - - if (invalidate) { - taosLRUCacheRelease(pCache, h, true); - } else { - taosLRUCacheRelease(pCache, h, false); - } - } - - // getTableCacheKey(uid, "l", key, &keyLen); - getTableCacheKey(uid, 1, key, &keyLen); - h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - bool invalidate = false; - int16_t nCol = taosArrayGetSize(pLast); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (eKey >= tTsVal->ts) { - invalidate = true; - break; - } - } - - if (invalidate) { - taosLRUCacheRelease(pCache, h, true); - } else { - taosLRUCacheRelease(pCache, h, false); - } - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); - } - - return code; -} -*/ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) { int32_t code = 0; STSRow *cacheRow = NULL; @@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea } if (record.version <= pReader->info.verRange.maxVer) { + /* + tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records", + TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid); + */ SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; taosArrayPush(pInfo->pTombData, &delData); } @@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie goto _err; } - loadDataTomb(state->pr, state->pr->pFileReader); - state->pr->pCurFileSet = state->pFileSet; + + loadDataTomb(state->pr, state->pr->pFileReader); } if (!state->pIndexList) { @@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie state->iBrinIndex = indexSize; } + if (state->pFileSet != state->pr->pCurFileSet) { + state->pr->pCurFileSet = state->pFileSet; + } + code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid, state->pr, state->lastTs, aCols, nCols); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 47684e71824fcdb0027e61209b5905a3da327019..6eee9df52823f38bfcd1bdbae69952edc58cd245 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2779,7 +2779,7 @@ _error: return NULL; } -static SSDataBlock* getTableDataBlockImpl(void* param) { +static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; STableMergeScanInfo* pInfo = pOperator->info; @@ -2797,6 +2797,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); } @@ -2805,8 +2806,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } if (isTaskKilled(pTaskInfo)) { + qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + break; } // process this data block based on the probabilities @@ -2819,6 +2821,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { + qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); } @@ -2909,7 +2912,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); } - tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL); // one table has one data block int32_t numOfTable = tableEndIdx - tableStartIdx + 1; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7784bc0c94f765a356faeac0d360a3d327e933f6..0a8d7ee376b9a92a7c4bc5fb7544de0f6bb77030 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1049,12 +1049,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } if (pBlk == NULL) { break; - }; + } + + if (tsortIsClosed(pHandle)) { + tSimpleHashClear(mUidBlk); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + break; + } } + tSimpleHashCleanup(mUidBlk); taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); - taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); + if (!tsortIsClosed(pHandle)) { + taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); + } taosArrayDestroy(aExtSrc); pHandle->type = SORT_SINGLESOURCE_SORT; diff --git a/tests/system-test/eco-system/manager/cmul.py b/tests/system-test/eco-system/manager/cmul.py new file mode 100644 index 0000000000000000000000000000000000000000..ac2fa5e4f28743513e7c78c3a9c3ecbc5c7592e9 --- /dev/null +++ b/tests/system-test/eco-system/manager/cmul.py @@ -0,0 +1,104 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# The option for wal_retetion_period and wal_retention_size is work well +# + +import taos +from taos.tmq import Consumer + +import os +import sys +import threading +import json +import time +import random +from datetime import date +from datetime import datetime +from datetime import timedelta +from os import path + + +topicName = "topic" +topicNum = 100 + +# consume topic +def consume_topic(topic_name, group,consume_cnt, index, wait): + consumer = Consumer( + { + "group.id": group, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + } + ) + + print(f"start consumer topic:{topic_name} group={group} index={index} ...") + consumer.subscribe([topic_name]) + cnt = 0 + try: + while True and cnt < consume_cnt: + res = consumer.poll(1) + if not res: + if wait: + continue + else: + break + err = res.error() + if err is not None: + raise err + val = res.value() + cnt += 1 + print(f" consume {cnt} ") + for block in val: + datas = block.fetchall() + data = datas[0][:50] + + print(f" {topic_name}_{group}_{index} {cnt} {data}") + + finally: + consumer.unsubscribe() + consumer.close() + +def consumerThread(index): + global topicName, topicNum + print(f' thread {index} start...') + while True: + idx = random.randint(0, topicNum - 1) + name = f"{topicName}{idx}" + group = f"group_{index}_{idx}" + consume_topic(name, group, 100, index, True) + + + +if __name__ == "__main__": + print(sys.argv) + threadCnt = 10 + + if len(sys.argv) == 1: + threadCnt = int(sys.argv[1]) + + + threads = [] + print(f'consumer with {threadCnt} threads...') + for i in range(threadCnt): + x = threading.Thread(target=consumerThread, args=(i,)) + x.start() + threads.append(x) + + # wait + for i, thread in enumerate(threads): + thread.join() + print(f'join thread {i} end.') + diff --git a/tests/system-test/eco-system/manager/mul.py b/tests/system-test/eco-system/manager/mul.py new file mode 100644 index 0000000000000000000000000000000000000000..d78b63d386209d06a78ff9f77aa8552d08f1a181 --- /dev/null +++ b/tests/system-test/eco-system/manager/mul.py @@ -0,0 +1,114 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import os +import sys +import random +import time + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + + # prepareEnv + def prepareEnv(self): + self.dbName = "mullevel" + self.stbName = "meters" + self.topicName = "topic" + self.topicNum = 100 + self.loop = 50000 + + sql = f"use {self.dbName}" + tdSql.execute(sql) + + # generate topic sql + self.sqls = [ + f"select * from {self.stbName}", + f"select * from {self.stbName} where ui < 200", + f"select * from {self.stbName} where fc > 20.1", + f"select * from {self.stbName} where nch like '%%a%%'", + f"select * from {self.stbName} where fc > 20.1", + f"select lower(bin) from {self.stbName} where length(bin) < 10;", + f"select upper(bin) from {self.stbName} where length(nch) > 10;", + f"select upper(bin) from {self.stbName} where ti > 10 or ic < 40;", + f"select * from {self.stbName} where ic < 100 " + ] + + + + # prepareEnv + def createTopics(self): + for i in range(self.topicNum): + topicName = f"{self.topicName}{i}" + sql = random.choice(self.sqls) + createSql = f"create topic if not exists {topicName} as {sql}" + try: + tdSql.execute(createSql, 3, True) + except: + tdLog.info(f" create topic {topicName} failed.") + + + # random del topic + def managerTopics(self): + + for i in range(self.loop): + tdLog.info(f"start modify loop={i}") + idx = random.randint(0, self.topicNum - 1) + # delete + topicName = f"{self.topicName}{idx}" + sql = f"drop topic if exist {topicName}" + try: + tdSql.execute(sql, 3, True) + except: + tdLog.info(f" drop topic {topicName} failed.") + + + # create topic + sql = random.choice(self.sqls) + createSql = f"create topic if not exists {topicName} as {sql}" + try: + tdSql.execute(createSql, 3, True) + except: + tdLog.info(f" create topic {topicName} failed.") + + seconds = [0.1, 0.5, 3, 2.5, 1.5, 0.4, 5.2, 2.6, 0.4, 0.2] + time.sleep(random.choice(seconds)) + + + # run + def run(self): + # prepare env + self.prepareEnv() + + # create topic + self.createTopics() + + # modify topic + self.managerTopics() + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())