提交 7e3f1d92 编写于 作者: H Haojun Liao

Merge branch 'main' into fix/liaohj

...@@ -82,7 +82,7 @@ INSERT INTO d1001 (ts, current, phase) VALUES ('2021-07-13 14:06:33.196', 10.27, ...@@ -82,7 +82,7 @@ INSERT INTO d1001 (ts, current, phase) VALUES ('2021-07-13 14:06:33.196', 10.27,
```sql ```sql
INSERT INTO d1001 VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33) INSERT INTO d1001 VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33)
d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31; d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31);
``` ```
## 插入记录时自动建表 ## 插入记录时自动建表
......
...@@ -95,30 +95,11 @@ taos -C ...@@ -95,30 +95,11 @@ taos -C
### maxShellConns ### maxShellConns
| 属性 | 说明 | | 属性 | 说明 |
| --------| ----------------------- | | -------- | ----------------------- |
| 适用范围 | 仅服务端适用 | | 适用范围 | 仅服务端适用 |
| 含义 | 一个 dnode 容许的连接数 | | 含义 | 一个 dnode 容许的连接数 |
| 取值范围 | 10-50000000 | | 取值范围 | 10-50000000 |
| 缺省值 | 5000 | | 缺省值 | 5000 |
### numOfRpcSessions
| 属性 | 说明 |
| --------| ---------------------- |
| 适用范围 | 客户端和服务端都适用 |
| 含义 | 一个客户端能创建的最大连接数|
| 取值范围 | 100-100000 |
| 缺省值 | 10000 |
### timeToGetAvailableConn
| 属性 | 说明 |
| -------- | --------------------|
| 适用范围 | 客户端和服务端都适用 |
| 含义 |获得可用连接的最长等待时间|
| 取值范围 | 10-50000000(单位为毫秒)|
| 缺省值 | 500000 |
### numOfRpcSessions ### numOfRpcSessions
...@@ -127,7 +108,7 @@ taos -C ...@@ -127,7 +108,7 @@ taos -C
| 适用范围 | 客户端和服务端都适用 | | 适用范围 | 客户端和服务端都适用 |
| 含义 | 一个客户端能创建的最大连接数 | | 含义 | 一个客户端能创建的最大连接数 |
| 取值范围 | 100-100000 | | 取值范围 | 100-100000 |
| 缺省值 | 10000 | | 缺省值 | 30000 |
### timeToGetAvailableConn ### timeToGetAvailableConn
...@@ -392,12 +373,12 @@ charset 的有效值是 UTF-8。 ...@@ -392,12 +373,12 @@ charset 的有效值是 UTF-8。
### metaCacheMaxSize ### metaCacheMaxSize
| 属性 | 说明 | | 属性 | 说明 |
| -------- | ---------------------------------------------- | | -------- | ------------------------------------ |
| 适用范围 | 仅客户端适用 | | 适用范围 | 仅客户端适用 |
| 含义 | 指定单个客户端元数据缓存大小的最大值 | | 含义 | 指定单个客户端元数据缓存大小的最大值 |
| 单位 | MB | | 单位 | MB |
| 缺省值 | -1 (无限制) | | 缺省值 | -1 (无限制) |
## 集群相关 ## 集群相关
...@@ -479,13 +460,13 @@ charset 的有效值是 UTF-8。 ...@@ -479,13 +460,13 @@ charset 的有效值是 UTF-8。
### slowLogScope ### slowLogScope
| 属性 | 说明 | | 属性 | 说明 |
| -------- | --------------------------------------------------------------| | -------- | ---------------------------------------------------------- |
| 适用范围 | 仅客户端适用 | | 适用范围 | 仅客户端适用 |
| 含义 | 指定启动记录哪些类型的慢查询 | | 含义 | 指定启动记录哪些类型的慢查询 |
| 可选值 | ALL, QUERY, INSERT, OTHERS, NONE | | 可选值 | ALL, QUERY, INSERT, OTHERS, NONE |
| 缺省值 | ALL | | 缺省值 | ALL |
| 补充说明 | 默认记录所有类型的慢查询,可通过配置只记录某一类型的慢查询 | | 补充说明 | 默认记录所有类型的慢查询,可通过配置只记录某一类型的慢查询 |
### debugFlag ### debugFlag
...@@ -685,16 +666,16 @@ charset 的有效值是 UTF-8。 ...@@ -685,16 +666,16 @@ charset 的有效值是 UTF-8。
| 适用范围 | 仅客户端适用 | | 适用范围 | 仅客户端适用 |
| 含义 | schemaless 列数据是否顺序一致,从3.0.3.0开始,该配置废弃 | | 含义 | schemaless 列数据是否顺序一致,从3.0.3.0开始,该配置废弃 |
| 值域 | 0:不一致;1: 一致 | | 值域 | 0:不一致;1: 一致 |
| 缺省值 | 0 | 缺省值 | 0 |
### smlTsDefaultName ### smlTsDefaultName
| 属性 | 说明 | | 属性 | 说明 |
| -------- | -------------------------------------------------------- | | -------- | -------------------------------------------- |
| 适用范围 | 仅客户端适用 | | 适用范围 | 仅客户端适用 |
| 含义 | schemaless自动建表的时间列名字通过该配置设置 | | 含义 | schemaless自动建表的时间列名字通过该配置设置 |
| 类型 | 字符串 | | 类型 | 字符串 |
| 缺省值 | _ts | | 缺省值 | _ts |
## 其他 ## 其他
...@@ -728,31 +709,31 @@ charset 的有效值是 UTF-8。 ...@@ -728,31 +709,31 @@ charset 的有效值是 UTF-8。
### ttlChangeOnWrite ### ttlChangeOnWrite
| 属性 | 说明 | | 属性 | 说明 |
| -------- | ------------------ | | -------- | ------------------------------------ |
| 适用范围 | 仅服务端适用 | | 适用范围 | 仅服务端适用 |
| 含义 | ttl 到期时间是否伴随表的修改操作改变 | | 含义 | ttl 到期时间是否伴随表的修改操作改变 |
| 取值范围 | 0: 不改变;1:改变 | | 取值范围 | 0: 不改变;1:改变 |
| 缺省值 | 0 | | 缺省值 | 0 |
### keepTimeOffset ### keepTimeOffset
| 属性 | 说明 | | 属性 | 说明 |
| -------- | ------------------ | | -------- | -------------- |
| 适用范围 | 仅服务端适用 | | 适用范围 | 仅服务端适用 |
| 含义 | 迁移操作的延时 | | 含义 | 迁移操作的延时 |
| 单位 | 小时 | | 单位 | 小时 |
| 取值范围 | 0-23 | | 取值范围 | 0-23 |
| 缺省值 | 0 | | 缺省值 | 0 |
### tmqMaxTopicNum ### tmqMaxTopicNum
| 属性 | 说明 | | 属性 | 说明 |
| -------- | ------------------ | | -------- | --------------------------- |
| 适用范围 | 仅服务端适用 | | 适用范围 | 仅服务端适用 |
| 含义 | 订阅最多可建立的 topic 数量 | | 含义 | 订阅最多可建立的 topic 数量 |
| 取值范围 | 1-10000| | 取值范围 | 1-10000 |
| 缺省值 | 20 | | 缺省值 | 20 |
## 压缩参数 ## 压缩参数
......
...@@ -89,7 +89,7 @@ typedef struct SRpcInit { ...@@ -89,7 +89,7 @@ typedef struct SRpcInit {
int32_t retryMinInterval; // retry init interval int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval
int64_t retryMaxTimouet; int64_t retryMaxTimeout;
int32_t failFastThreshold; int32_t failFastThreshold;
int32_t failFastInterval; int32_t failFastInterval;
......
...@@ -123,8 +123,8 @@ function clean_bin() { ...@@ -123,8 +123,8 @@ function clean_bin() {
${csudo}rm -f ${bin_link_dir}/set_core || : ${csudo}rm -f ${bin_link_dir}/set_core || :
${csudo}rm -f ${bin_link_dir}/TDinsight.sh || : ${csudo}rm -f ${bin_link_dir}/TDinsight.sh || :
${csudo}rm -f ${bin_link_dir}/${keeperName2} || : ${csudo}rm -f ${bin_link_dir}/${keeperName2} || :
${csudo}rm -f ${bin_link_dir}/${xName2} || : # ${csudo}rm -f ${bin_link_dir}/${xName2} || :
${csudo}rm -f ${bin_link_dir}/${explorerName2} || : # ${csudo}rm -f ${bin_link_dir}/${explorerName2} || :
if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then
${csudo}rm -f ${bin_link_dir}/${clientName2} || : ${csudo}rm -f ${bin_link_dir}/${clientName2} || :
...@@ -194,26 +194,26 @@ function clean_service_on_systemd() { ...@@ -194,26 +194,26 @@ function clean_service_on_systemd() {
fi fi
${csudo}systemctl disable ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null ${csudo}systemctl disable ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null
x_service_config="${service_config_dir}/${xName2}.service" # x_service_config="${service_config_dir}/${xName2}.service"
if [ -e "$x_service_config" ]; then # if [ -e "$x_service_config" ]; then
if systemctl is-active --quiet ${xName2}; then # if systemctl is-active --quiet ${xName2}; then
echo "${productName2} ${xName2} is running, stopping it..." # echo "${productName2} ${xName2} is running, stopping it..."
${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null # ${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null
fi # fi
${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null # ${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${x_service_config} # ${csudo}rm -f ${x_service_config}
fi # fi
explorer_service_config="${service_config_dir}/${explorerName2}.service" # explorer_service_config="${service_config_dir}/${explorerName2}.service"
if [ -e "$explorer_service_config" ]; then # if [ -e "$explorer_service_config" ]; then
if systemctl is-active --quiet ${explorerName2}; then # if systemctl is-active --quiet ${explorerName2}; then
echo "${productName2} ${explorerName2} is running, stopping it..." # echo "${productName2} ${explorerName2} is running, stopping it..."
${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null # ${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null
fi # fi
${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null # ${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${explorer_service_config} # ${csudo}rm -f ${explorer_service_config}
${csudo}rm -f /etc/${clientName2}/explorer.toml # ${csudo}rm -f /etc/${clientName2}/explorer.toml
fi # fi
} }
function clean_service_on_sysvinit() { function clean_service_on_sysvinit() {
......
...@@ -169,7 +169,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -169,7 +169,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMAX(connLimitNum, 10);
......
...@@ -47,7 +47,7 @@ bool tsPrintAuth = false; ...@@ -47,7 +47,7 @@ bool tsPrintAuth = false;
// queue & threads // queue & threads
int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 10000; int32_t tsNumOfRpcSessions = 30000;
int32_t tsTimeToGetAvailableConn = 500000; int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60; int32_t tsKeepAliveIdle = 60;
...@@ -1281,9 +1281,9 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { ...@@ -1281,9 +1281,9 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; // tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
// } else if (strcasecmp("smlBatchSize", name) == 0) { // } else if (strcasecmp("smlBatchSize", name) == 0) {
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; // tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
} else if(strcasecmp("smlTsDefaultName", name) == 0) { } else if (strcasecmp("smlTsDefaultName", name) == 0) {
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN); tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
} else if(strcasecmp("smlDot2Underline", name) == 0) { } else if (strcasecmp("smlDot2Underline", name) == 0) {
tsSmlDot2Underline = cfgGetItem(pCfg, "smlDot2Underline")->bval; tsSmlDot2Underline = cfgGetItem(pCfg, "smlDot2Underline")->bval;
} else if (strcasecmp("shellActivityTimer", name) == 0) { } else if (strcasecmp("shellActivityTimer", name) == 0) {
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
......
...@@ -299,7 +299,7 @@ int32_t dmInitClient(SDnode *pDnode) { ...@@ -299,7 +299,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 5000; // interval threshold(ms) rpcInit.failFastInterval = 5000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold rpcInit.failFastThreshold = 3; // failed threshold
......
...@@ -439,7 +439,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void ...@@ -439,7 +439,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
return code; return code;
_end: _end:
tsdbReaderClose(pReader); tsdbReaderClose2(pReader);
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
...@@ -1731,41 +1731,45 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1731,41 +1731,45 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
// row in last file block // row in last file block
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist if (key < tsLast) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) { } else if (key > tsLast) {
SRow* pTSRow = NULL; return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); }
if (code != TSDB_CODE_SUCCESS) { } else {
return code; if (key > tsLast) {
} return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key < tsLast) {
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
}
// the following for key == tsLast
SRow* pTSRow = NULL;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL); tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
return code;
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
return code;
} else { // key > ts
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
} else { // desc order
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
}
} else { // only last block exists } else { // only last block exists
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
} }
...@@ -2192,7 +2196,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2192,7 +2196,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW *pRow = NULL, *piRow = NULL; TSDBROW *pRow = NULL, *piRow = NULL;
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] :
(ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
} }
...@@ -2566,9 +2571,18 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2566,9 +2571,18 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (pScanInfo == NULL) {
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
continue;
}
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo);
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2577,9 +2591,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2577,9 +2591,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
continue; continue;
} }
// reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo);
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) { if (!hasDataInLastFile) {
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
...@@ -2669,16 +2680,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2669,16 +2680,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else { } else {
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
// only return the rows in last block int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN;
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) ||
ASSERT(tsLast >= pBlockInfo->record.lastKey); (!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) {
// whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
} else {
SBlockData* pBData = &pReader->status.fileBlockData; SBlockData* pBData = &pReader->status.fileBlockData;
tBlockDataReset(pBData); tBlockDataReset(pBData);
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr); tsdbDebug("load data in last block firstly %s", pReader->idStr);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -2709,23 +2736,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2709,23 +2736,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr); pResBlock->info.rows, el, pReader->idStr);
} }
} else { // whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
} }
} }
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
...@@ -4098,12 +4110,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { ...@@ -4098,12 +4110,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
} }
tsdbDataFileReaderClose(&pReader->pFileReader); tsdbDataFileReaderClose(&pReader->pFileReader);
int64_t loadBlocks = 0; int64_t loadBlocks = 0;
double elapse = 0; double elapse = 0;
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse); pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
// resetDataBlockScanInfo excluding lastKey // resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
int32_t iter = 0; int32_t iter = 0;
......
...@@ -46,14 +46,14 @@ typedef struct { ...@@ -46,14 +46,14 @@ typedef struct {
int8_t connType; int8_t connType;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
int32_t compatibilityVer; int32_t compatibilityVer;
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not int8_t encryption; // encrypt or not
int32_t retryMinInterval; // retry init interval int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval
int32_t retryMaxTimouet; int32_t retryMaxTimeout;
int32_t failFastThreshold; int32_t failFastThreshold;
int32_t failFastInterval; int32_t failFastInterval;
......
...@@ -55,7 +55,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -55,7 +55,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval
pRpc->retryStepFactor = pInit->retryStepFactor; pRpc->retryStepFactor = pInit->retryStepFactor;
pRpc->retryMaxInterval = pInit->retryMaxInterval; pRpc->retryMaxInterval = pInit->retryMaxInterval;
pRpc->retryMaxTimouet = pInit->retryMaxTimouet; pRpc->retryMaxTimeout = pInit->retryMaxTimeout;
pRpc->failFastThreshold = pInit->failFastThreshold; pRpc->failFastThreshold = pInit->failFastThreshold;
pRpc->failFastInterval = pInit->failFastInterval; pRpc->failFastInterval = pInit->failFastInterval;
......
...@@ -2287,7 +2287,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -2287,7 +2287,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pCtx->retryMinInterval = pTransInst->retryMinInterval; pCtx->retryMinInterval = pTransInst->retryMinInterval;
pCtx->retryMaxInterval = pTransInst->retryMaxInterval; pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
pCtx->retryStepFactor = pTransInst->retryStepFactor; pCtx->retryStepFactor = pTransInst->retryStepFactor;
pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet; pCtx->retryMaxTimeout = pTransInst->retryMaxTimeout;
pCtx->retryInitTimestamp = taosGetTimestampMs(); pCtx->retryInitTimestamp = taosGetTimestampMs();
pCtx->retryNextInterval = pCtx->retryMinInterval; pCtx->retryNextInterval = pCtx->retryMinInterval;
pCtx->retryStep = 0; pCtx->retryStep = 0;
......
...@@ -726,7 +726,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -726,7 +726,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tError("read error %s", uv_err_name(nread)); tError("read error %s", uv_err_name(nread));
} }
// TODO(log other failure reason) // TODO(log other failure reason)
tWarn("failed to create connect:%p", q); tWarn("failed to create connect:%p, reason: %s", q, uv_err_name(nread));
taosMemoryFree(buf->base); taosMemoryFree(buf->base);
uv_close((uv_handle_t*)q, NULL); uv_close((uv_handle_t*)q, NULL);
return; return;
...@@ -741,10 +741,17 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -741,10 +741,17 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_pipe_t* pipe = (uv_pipe_t*)q; uv_pipe_t* pipe = (uv_pipe_t*)q;
if (!uv_pipe_pending_count(pipe)) { if (!uv_pipe_pending_count(pipe)) {
tError("No pending count"); tError("No pending count");
uv_close((uv_handle_t*)q, NULL);
return;
}
if (pThrd->quit) {
tWarn("thread already received quit msg, ignore incoming conn");
uv_close((uv_handle_t*)q, NULL);
return; return;
} }
uv_handle_type pending = uv_pipe_pending_type(pipe); // uv_handle_type pending = uv_pipe_pending_type(pipe);
SSvrConn* pConn = createConn(pThrd); SSvrConn* pConn = createConn(pThrd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册