提交 5730fac7 编写于 作者: H Haojun Liao

Merge branch '3.0' into fix/liaohj

......@@ -31,7 +31,8 @@ Websocket connections are supported on all platforms that can run Go.
| connector-rust version | TDengine version | major features |
| :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. |
| v0.9.2 | 3.0.7.0 or later | STMT: Get tag_fields and col_fields under ws. |
| v0.8.12 | 3.0.5.0 | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.0 | 3.0.4.0 | Support schemaless insert. |
| v0.7.6 | 3.0.3.0 | Support req_id in query. |
| v0.6.0 | 3.0.0.0 | Base features. |
......
......@@ -30,7 +30,8 @@ Websocket 连接支持所有能运行 Rust 的平台。
| Rust 连接器版本 | TDengine 版本 | 主要功能 |
| :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.9.2 | 3.0.7.0 or later | STMT:ws 下获取 tag_fields、col_fields。 |
| v0.8.12 | 3.0.5.0 | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.8.0 | 3.0.4.0 | 支持无模式写入。 |
| v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 |
| v0.6.0 | 3.0.0.0 | 基础功能。 |
......
......@@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
return code;
}
/*
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL;
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosArrayPush(pLastArray, &lastCol);
}
return code;
}
*/
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0;
// fetch schema
......@@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->lruMutex);
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
......@@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]);
taosThreadMutexLock(&pTsdb->lruMutex);
// taosThreadMutexLock(&pTsdb->lruMutex);
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
if (h) {
......@@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
taosThreadMutexUnlock(&pTsdb->lruMutex);
// taosThreadMutexUnlock(&pTsdb->lruMutex);
}
for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]);
......@@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksMayWrite(pTsdb, true, false, true);
taosThreadMutexUnlock(&pTsdb->lruMutex);
_exit:
taosMemoryFree(pTSchema);
......@@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
return code;
}
/*
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
}
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
*/
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
int32_t code = 0;
STSRow *cacheRow = NULL;
......@@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
}
if (record.version <= pReader->info.verRange.maxVer) {
/*
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
*/
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData);
}
......@@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto _err;
}
loadDataTomb(state->pr, state->pr->pFileReader);
state->pr->pCurFileSet = state->pFileSet;
loadDataTomb(state->pr, state->pr->pFileReader);
}
if (!state->pIndexList) {
......@@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->iBrinIndex = indexSize;
}
if (state->pFileSet != state->pr->pCurFileSet) {
state->pr->pCurFileSet = state->pFileSet;
}
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -2779,7 +2779,7 @@ _error:
return NULL;
}
static SSDataBlock* getTableDataBlockImpl(void* param) {
static SSDataBlock* getBlockForTableMergeScan(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info;
......@@ -2797,6 +2797,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2805,8 +2806,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
if (isTaskKilled(pTaskInfo)) {
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
break;
}
// process this data block based on the probabilities
......@@ -2819,6 +2821,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2909,7 +2912,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
}
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
// one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
......
......@@ -1049,12 +1049,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
if (pBlk == NULL) {
break;
};
}
if (tsortIsClosed(pHandle)) {
tSimpleHashClear(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
break;
}
}
tSimpleHashCleanup(mUidBlk);
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
if (!tsortIsClosed(pHandle)) {
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
}
taosArrayDestroy(aExtSrc);
pHandle->type = SORT_SINGLESOURCE_SORT;
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# The option for wal_retetion_period and wal_retention_size is work well
#
import taos
from taos.tmq import Consumer
import os
import sys
import threading
import json
import time
import random
from datetime import date
from datetime import datetime
from datetime import timedelta
from os import path
topicName = "topic"
topicNum = 100
# consume topic
def consume_topic(topic_name, group,consume_cnt, index, wait):
consumer = Consumer(
{
"group.id": group,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
print(f"start consumer topic:{topic_name} group={group} index={index} ...")
consumer.subscribe([topic_name])
cnt = 0
try:
while True and cnt < consume_cnt:
res = consumer.poll(1)
if not res:
if wait:
continue
else:
break
err = res.error()
if err is not None:
raise err
val = res.value()
cnt += 1
print(f" consume {cnt} ")
for block in val:
datas = block.fetchall()
data = datas[0][:50]
print(f" {topic_name}_{group}_{index} {cnt} {data}")
finally:
consumer.unsubscribe()
consumer.close()
def consumerThread(index):
global topicName, topicNum
print(f' thread {index} start...')
while True:
idx = random.randint(0, topicNum - 1)
name = f"{topicName}{idx}"
group = f"group_{index}_{idx}"
consume_topic(name, group, 100, index, True)
if __name__ == "__main__":
print(sys.argv)
threadCnt = 10
if len(sys.argv) == 1:
threadCnt = int(sys.argv[1])
threads = []
print(f'consumer with {threadCnt} threads...')
for i in range(threadCnt):
x = threading.Thread(target=consumerThread, args=(i,))
x.start()
threads.append(x)
# wait
for i, thread in enumerate(threads):
thread.join()
print(f'join thread {i} end.')
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import sys
import random
import time
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
# prepareEnv
def prepareEnv(self):
self.dbName = "mullevel"
self.stbName = "meters"
self.topicName = "topic"
self.topicNum = 100
self.loop = 50000
sql = f"use {self.dbName}"
tdSql.execute(sql)
# generate topic sql
self.sqls = [
f"select * from {self.stbName}",
f"select * from {self.stbName} where ui < 200",
f"select * from {self.stbName} where fc > 20.1",
f"select * from {self.stbName} where nch like '%%a%%'",
f"select * from {self.stbName} where fc > 20.1",
f"select lower(bin) from {self.stbName} where length(bin) < 10;",
f"select upper(bin) from {self.stbName} where length(nch) > 10;",
f"select upper(bin) from {self.stbName} where ti > 10 or ic < 40;",
f"select * from {self.stbName} where ic < 100 "
]
# prepareEnv
def createTopics(self):
for i in range(self.topicNum):
topicName = f"{self.topicName}{i}"
sql = random.choice(self.sqls)
createSql = f"create topic if not exists {topicName} as {sql}"
try:
tdSql.execute(createSql, 3, True)
except:
tdLog.info(f" create topic {topicName} failed.")
# random del topic
def managerTopics(self):
for i in range(self.loop):
tdLog.info(f"start modify loop={i}")
idx = random.randint(0, self.topicNum - 1)
# delete
topicName = f"{self.topicName}{idx}"
sql = f"drop topic if exist {topicName}"
try:
tdSql.execute(sql, 3, True)
except:
tdLog.info(f" drop topic {topicName} failed.")
# create topic
sql = random.choice(self.sqls)
createSql = f"create topic if not exists {topicName} as {sql}"
try:
tdSql.execute(createSql, 3, True)
except:
tdLog.info(f" create topic {topicName} failed.")
seconds = [0.1, 0.5, 3, 2.5, 1.5, 0.4, 5.2, 2.6, 0.4, 0.2]
time.sleep(random.choice(seconds))
# run
def run(self):
# prepare env
self.prepareEnv()
# create topic
self.createTopics()
# modify topic
self.managerTopics()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册