提交 60fd3c4e 编写于 作者: S Shengliang Guan

Merge branch '3.0' into fix/TD-20052

...@@ -102,6 +102,12 @@ IF ("${CPUTYPE}" STREQUAL "") ...@@ -102,6 +102,12 @@ IF ("${CPUTYPE}" STREQUAL "")
SET(TD_ARM_64 TRUE) SET(TD_ARM_64 TRUE)
ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_64") ADD_DEFINITIONS("-D_TD_ARM_64")
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "loongarch64")
MESSAGE(STATUS "The current platform is loongarch64")
SET(PLATFORM_ARCH_STR "loongarch64")
SET(TD_LOONGARCH_64 TRUE)
ADD_DEFINITIONS("-D_TD_LOONGARCH_")
ADD_DEFINITIONS("-D_TD_LOONGARCH_64")
ENDIF () ENDIF ()
ELSE () ELSE ()
# if generate ARM version: # if generate ARM version:
...@@ -118,6 +124,12 @@ ELSE () ...@@ -118,6 +124,12 @@ ELSE ()
ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_64") ADD_DEFINITIONS("-D_TD_ARM_64")
SET(TD_ARM_64 TRUE) SET(TD_ARM_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "loongarch64")
SET(PLATFORM_ARCH_STR "loongarch64")
MESSAGE(STATUS "input cpuType: loongarch64")
SET(TD_LOONGARCH_64 TRUE)
ADD_DEFINITIONS("-D_TD_LOONGARCH_")
ADD_DEFINITIONS("-D_TD_LOONGARCH_64")
ELSEIF (${CPUTYPE} MATCHES "mips64") ELSEIF (${CPUTYPE} MATCHES "mips64")
SET(PLATFORM_ARCH_STR "mips") SET(PLATFORM_ARCH_STR "mips")
MESSAGE(STATUS "input cpuType: mips64") MESSAGE(STATUS "input cpuType: mips64")
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG a921bd4 GIT_TAG 23e2b73
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -14,7 +14,7 @@ Note: ● means officially tested and verified, ○ means unofficially tested an ...@@ -14,7 +14,7 @@ Note: ● means officially tested and verified, ○ means unofficially tested an
## List of supported platforms for TDengine clients and connectors ## List of supported platforms for TDengine clients and connectors
TDengine's connector can support a wide range of platforms, including X64/X86/ARM64/ARM32/MIPS/Alpha hardware platforms and Linux/Win64/Win32/macOS development environments. TDengine's connector can support a wide range of platforms, including X64/X86/ARM64/ARM32/MIPS/Alpha/LoongArch64 hardware platforms and Linux/Win64/Win32/macOS development environments.
The comparison matrix is as follows. The comparison matrix is as follows.
......
...@@ -3,6 +3,8 @@ title: 立即开始 ...@@ -3,6 +3,8 @@ title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询' description: '快速设置 TDengine 环境并体验其高效写入和查询'
--- ---
import xiaot from './tdengine.webp'
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api) TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)
本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。 本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。
...@@ -12,4 +14,10 @@ import DocCardList from '@theme/DocCardList'; ...@@ -12,4 +14,10 @@ import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/> <DocCardList items={useCurrentSidebarCategory().items}/>
``` ```
\ No newline at end of file
### 开发者技术交流群
微信扫描下面二维码,加“小 T”为好友,即可加入“物联网大数据技术前沿群”,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。
<img src={xiaot} alt="小 T 的二维码" width="200" />
...@@ -115,6 +115,7 @@ TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤) ...@@ -115,6 +115,7 @@ TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
<summary>订阅和消费</summary> <summary>订阅和消费</summary>
```c ```c
{{#include examples/c/tmq.c}}
``` ```
</details> </details>
......
...@@ -16,7 +16,7 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表" ...@@ -16,7 +16,7 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
## TDengine 客户端和连接器支持的平台列表 ## TDengine 客户端和连接器支持的平台列表
目前 TDengine 的连接器可支持的平台广泛,目前包括:X64/X86/ARM64/ARM32/MIPS/Alpha 等硬件平台,以及 Linux/Win64/Win32/macOS 等开发环境。 目前 TDengine 的连接器可支持的平台广泛,目前包括:X64/X86/ARM64/ARM32/MIPS/LoongArch64 等硬件平台,以及 Linux/Win64/Win32/macOS 等开发环境。
对照矩阵如下: 对照矩阵如下:
......
--- ---
sidebar_label: TDengine 发布历史 sidebar_label: TDengine 发布历史
title: TDengine 发布历史 title: TDengine 发布历史及下载链接
description: TDengine 发布历史、Release Notes 及下载链接 description: TDengine 发布历史、Release Notes 及下载链接
--- ---
各版本 TDengine 安装包下载链接如下:
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.0.1.6 ## 3.0.1.6
...@@ -33,4 +35,3 @@ import Release from "/components/ReleaseV3"; ...@@ -33,4 +35,3 @@ import Release from "/components/ReleaseV3";
## 3.0.1.0 ## 3.0.1.0
<Release type="tdengine" version="3.0.1.0" /> <Release type="tdengine" version="3.0.1.0" />
--- ---
sidebar_label: taosTools 发布历史 sidebar_label: taosTools 发布历史
title: taosTools 发布历史 title: taosTools 发布历史及下载链接
description: taosTools 的发布历史、Release Notes 和下载链接 description: taosTools 的发布历史、Release Notes 和下载链接
--- ---
各版本 taosTools 安装包下载链接如下:
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 2.2.7 ## 2.2.7
......
...@@ -418,13 +418,17 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr ...@@ -418,13 +418,17 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeVariantI32(buf, &pSW->nCols); buf = taosDecodeVariantI32(buf, &pSW->nCols);
buf = taosDecodeVariantI32(buf, &pSW->version); buf = taosDecodeVariantI32(buf, &pSW->version);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->nCols > 0) {
if (pSW->pSchema == NULL) { pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
return NULL; if (pSW->pSchema == NULL) {
} return NULL;
}
for (int32_t i = 0; i < pSW->nCols; i++) { for (int32_t i = 0; i < pSW->nCols; i++) {
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
}
} else {
pSW->pSchema = NULL;
} }
return (void*)buf; return (void*)buf;
} }
...@@ -839,7 +843,7 @@ typedef struct { ...@@ -839,7 +843,7 @@ typedef struct {
int64_t dbId; int64_t dbId;
int32_t vgVersion; int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs; // ms int64_t stateTs; // ms
} SUseDbReq; } SUseDbReq;
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
...@@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE ...@@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
} }
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema); taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
pSubTopicEp->schema.nCols = 0;
taosArrayDestroy(pSubTopicEp->vgs); taosArrayDestroy(pSubTopicEp->vgs);
} }
......
...@@ -5,7 +5,7 @@ set -e ...@@ -5,7 +5,7 @@ set -e
#set -x #set -x
# dockerbuild.sh # dockerbuild.sh
# -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] # -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...]
# -n [version number] # -n [version number]
# -p [password for docker hub] # -p [password for docker hub]
# -V [stable | beta] # -V [stable | beta]
...@@ -57,7 +57,7 @@ do ...@@ -57,7 +57,7 @@ do
dockerLatest=$(echo $OPTARG) dockerLatest=$(echo $OPTARG)
;; ;;
h) h)
echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] " echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...] "
echo " -n [version number] " echo " -n [version number] "
echo " -p [password for docker hub] " echo " -p [password for docker hub] "
echo " -V [stable | beta] " echo " -V [stable | beta] "
...@@ -136,4 +136,4 @@ if [ "$cloudBuild" != "y" ] && [ ${dockerLatest} == 'y' ] ;then ...@@ -136,4 +136,4 @@ if [ "$cloudBuild" != "y" ] && [ ${dockerLatest} == 'y' ] ;then
docker push tdengine/tdengine-${dockername}:latest docker push tdengine/tdengine-${dockername}:latest
fi fi
rm -f ${pkgFile} rm -f ${pkgFile}
\ No newline at end of file
...@@ -5,7 +5,7 @@ set -e ...@@ -5,7 +5,7 @@ set -e
#set -x #set -x
# dockerbuild.sh # dockerbuild.sh
# -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] # -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...]
# -n [version number] # -n [version number]
# -p [password for docker hub] # -p [password for docker hub]
...@@ -30,7 +30,7 @@ do ...@@ -30,7 +30,7 @@ do
passWord=$(echo $OPTARG) passWord=$(echo $OPTARG)
;; ;;
h) h)
echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] " echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...] "
echo " -n [version number] " echo " -n [version number] "
echo " -p [password for docker hub] " echo " -p [password for docker hub] "
exit 0 exit 0
......
...@@ -6,7 +6,7 @@ set -e ...@@ -6,7 +6,7 @@ set -e
#set -x #set -x
# release.sh -v [cluster | edge] # release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...] # -c [aarch32 | aarch64 | x64 | x86 | mips64 | loongarch64...]
# -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] # -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
# -V [stable | beta] # -V [stable | beta]
# -l [full | lite] # -l [full | lite]
...@@ -19,7 +19,7 @@ set -e ...@@ -19,7 +19,7 @@ set -e
# set parameters by default value # set parameters by default value
verMode=edge # [cluster, edge, cloud] verMode=edge # [cluster, edge, cloud]
verType=stable # [stable, beta] verType=stable # [stable, beta]
cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...] cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 loongarch64...]
osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
pagMode=full # [full | lite] pagMode=full # [full | lite]
soMode=dynamic # [static | dynamic] soMode=dynamic # [static | dynamic]
...@@ -77,7 +77,7 @@ while getopts "hv:V:c:o:l:s:d:a:n:m:H:" arg; do ...@@ -77,7 +77,7 @@ while getopts "hv:V:c:o:l:s:d:a:n:m:H:" arg; do
;; ;;
h) h)
echo "Usage: $(basename $0) -v [cluster | edge] " echo "Usage: $(basename $0) -v [cluster | edge] "
echo " -c [aarch32 | aarch64 | x64 | x86 | mips64 ...] " echo " -c [aarch32 | aarch64 | x64 | x86 | mips64 | loongarch64 ...] "
echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] " echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] "
echo " -V [stable | beta] " echo " -V [stable | beta] "
echo " -l [full | lite] " echo " -l [full | lite] "
...@@ -216,7 +216,7 @@ else ...@@ -216,7 +216,7 @@ else
fi fi
# check support cpu type # check support cpu type
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "arm64" ]] || [[ "$cpuType" == "arm32" ]] || [[ "$cpuType" == "mips64" ]]; then if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "arm64" ]] || [[ "$cpuType" == "arm32" ]] || [[ "$cpuType" == "mips64" ]] || [[ "$cpuType" == "loongarch64" ]] ; then
if [ "$verMode" == "edge" ]; then if [ "$verMode" == "edge" ]; then
# community-version compile # community-version compile
cmake ../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro} cmake ../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
......
...@@ -61,7 +61,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -61,7 +61,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) { for (int32_t i = 0; i < numOfBatchs; ++i) {
SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, rsp->uid); tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs,
rsp->uid);
if (rsp->vgVersion < 0) { if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
...@@ -293,6 +294,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -293,6 +294,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosThreadMutexUnlock(&appInfo.mutex); taosThreadMutexUnlock(&appInfo.mutex);
tscError("cluster not exist, key:%s", key); tscError("cluster not exist, key:%s", key);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
return -1; return -1;
} }
...@@ -322,6 +324,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -322,6 +324,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return code; return code;
} }
......
...@@ -2082,7 +2082,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * ...@@ -2082,7 +2082,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
SSmlLineInfo elements = {0}; SSmlLineInfo elements = {0};
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql); uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql", info->id);
int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
......
...@@ -532,6 +532,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -532,6 +532,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.sql = strdup(pCreate->sql); streamObj.sql = strdup(pCreate->sql);
streamObj.smaId = smaObj.uid; streamObj.smaId = smaObj.uid;
streamObj.watermark = pCreate->watermark; streamObj.watermark = pCreate->watermark;
streamObj.fillHistory = STREAM_FILL_HISTORY_ON;
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
streamObj.triggerParam = pCreate->maxDelay; streamObj.triggerParam = pCreate->maxDelay;
streamObj.ast = strdup(smaObj.ast); streamObj.ast = strdup(smaObj.ast);
......
...@@ -637,6 +637,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -637,6 +637,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (pIter == NULL) break; if (pIter == NULL) break;
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i); char *name = taosArrayGetP(pConsumer->assignedTopics, i);
...@@ -649,6 +650,33 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -649,6 +650,33 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return -1; return -1;
} }
} }
sz = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
return -1;
}
}
sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
return -1;
}
}
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
} }
...@@ -675,15 +703,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -675,15 +703,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 0
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);
mndTransDrop(pTrans);
mndReleaseTopic(pMnode, pTopic);
return -1;
}
#endif
// TODO check if rebalancing // TODO check if rebalancing
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
/*ASSERT(0);*/ /*ASSERT(0);*/
......
...@@ -70,6 +70,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid); ...@@ -70,6 +70,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo); int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta);
struct SMeta { struct SMeta {
TdThreadRwlock lock; TdThreadRwlock lock;
......
...@@ -61,14 +61,14 @@ struct SVBufPoolNode { ...@@ -61,14 +61,14 @@ struct SVBufPoolNode {
}; };
struct SVBufPool { struct SVBufPool {
SVBufPool* next; SVBufPool* next;
SVnode* pVnode; SVnode* pVnode;
volatile int32_t nRef; TdThreadSpinlock* lock;
TdThreadSpinlock lock; volatile int32_t nRef;
int64_t size; int64_t size;
uint8_t* ptr; uint8_t* ptr;
SVBufPoolNode* pTail; SVBufPoolNode* pTail;
SVBufPoolNode node; SVBufPoolNode node;
}; };
int32_t vnodeOpenBufPool(SVnode* pVnode); int32_t vnodeOpenBufPool(SVnode* pVnode);
......
...@@ -1445,3 +1445,13 @@ int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) { ...@@ -1445,3 +1445,13 @@ int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) {
_exit: _exit:
return code; return code;
} }
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta) {
SMetaStbStats stats = {0};
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
stats.ctbNum += delta;
metaStatsCacheUpsert(pMeta, &stats);
}
}
...@@ -371,7 +371,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -371,7 +371,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
// update uid index // update uid index
metaUpdateUidIdx(pMeta, &nStbEntry); metaUpdateUidIdx(pMeta, &nStbEntry);
metaStatsCacheDrop(pMeta, nStbEntry.uid); // metaStatsCacheDrop(pMeta, nStbEntry.uid);
metaULock(pMeta); metaULock(pMeta);
...@@ -450,6 +450,10 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe ...@@ -450,6 +450,10 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe
#endif #endif
++pMeta->pVnode->config.vndStats.numOfCTables; ++pMeta->pVnode->config.vndStats.numOfCTables;
metaWLock(pMeta);
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1);
metaULock(pMeta);
} else { } else {
me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ctime = pReq->ctime;
me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.ttlDays = pReq->ttl;
...@@ -670,6 +674,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { ...@@ -670,6 +674,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn); tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn);
--pMeta->pVnode->config.vndStats.numOfCTables; --pMeta->pVnode->config.vndStats.numOfCTables;
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1);
} else if (e.type == TSDB_NORMAL_TABLE) { } else if (e.type == TSDB_NORMAL_TABLE) {
// drop schema.db (todo) // drop schema.db (todo)
......
...@@ -582,10 +582,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -582,10 +582,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = -1; code = -1;
} }
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%" PRId64 tqDebug("tmq poll: consumer %" PRId64
", version:%" PRId64 "", ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.version); dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
tDeleteSMqDataRsp(&dataRsp); tDeleteSMqDataRsp(&dataRsp);
return code; return code;
......
...@@ -244,7 +244,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -244,7 +244,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
tqDebug("tq sink, convert block %d, rows: %d", i, rows); tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
int32_t dataLen = 0; int32_t dataLen = 0;
int32_t schemaLen = 0; int32_t schemaLen = 0;
...@@ -486,7 +486,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -486,7 +486,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
blkHead->uid = 0; blkHead->uid = 0;
blkHead->schemaLen = 0; blkHead->schemaLen = 0;
tqDebug("tq sink, convert block %d, rows: %d", i, rows); tqDebug("tq sink, convert block2 %d, rows: %d", i, rows);
int32_t dataLen = 0; int32_t dataLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
...@@ -514,6 +514,9 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -514,6 +514,9 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
} else { } else {
void* colData = colDataGetData(pColData, j); void* colData = colDataGetData(pColData, j);
if (k == 0) {
tqDebug("tq sink, row %d ts %" PRId64, j, *(int64_t*)colData);
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
} }
} }
......
...@@ -458,11 +458,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p ...@@ -458,11 +458,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
} }
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
int8_t level = 1; int8_t level = 1;
int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
const uint32_t factor = 4;
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
level++; level++;
} }
...@@ -568,7 +567,9 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i ...@@ -568,7 +567,9 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
do { do {
key.ts = row.pTSRow->ts; key.ts = row.pTSRow->ts;
nRow++; nRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
}
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1); code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1);
if (code) { if (code) {
goto _err; goto _err;
......
...@@ -1015,6 +1015,191 @@ _err: ...@@ -1015,6 +1015,191 @@ _err:
return code; return code;
} }
static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow) {
int32_t code = 0;
SColVal cv = {0};
int32_t iColDataFrom = 0;
SColData *pColDataFrom =
(iColDataFrom < pBlockDataFrom->nColData) ? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom] : NULL;
for (int32_t iColDataTo = 0; iColDataTo < pBlockData->nColData; iColDataTo++) {
SColData *pColDataTo = &((SColData *)pBlockData->aColData->pData)[iColDataTo];
while (pColDataFrom && pColDataFrom->cid < pColDataTo->cid) {
iColDataFrom++;
pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData)
? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom]
: NULL;
}
if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) {
code = tColDataAppendValue(pColDataTo, &COL_VAL_NONE(pColDataTo->cid, pColDataTo->type));
if (code) goto _exit;
} else {
tColDataGetValue(pColDataFrom, iRow, &cv);
code = tColDataAppendValue(pColDataTo, &cv);
if (code) goto _exit;
iColDataFrom++;
pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData)
? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom]
: NULL;
}
}
_exit:
return code;
}
static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSchema *pTSchema) {
int32_t code = 0;
int32_t iTColumn = 1;
STColumn *pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
void *pBitmap = pRow->statis ? tdGetBitmapAddrTp(pRow, pTSchema->flen) : NULL;
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData];
while (pTColumn && pTColumn->colId < pColData->cid) {
iTColumn++;
pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
}
if (pTColumn == NULL || pTColumn->colId > pColData->cid) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
} else {
ASSERT(pTColumn->type == pColData->type);
SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type};
if (pRow->statis) {
TDRowValT vt = TD_VTYPE_MAX;
tdGetBitmapValTypeII(pBitmap, iTColumn - 1, &vt);
if (vt == TD_VTYPE_NORM) {
cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes);
}
code = tColDataAppendValue(pColData, &cv);
if (code) goto _exit;
} else if (vt == TD_VTYPE_NONE) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
} else if (vt == TD_VTYPE_NULL) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pColData->cid, pColData->type));
if (code) goto _exit;
} else {
ASSERT(0);
}
} else {
cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes);
}
code = tColDataAppendValue(pColData, &cv);
if (code) goto _exit;
}
iTColumn++;
pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
}
}
_exit:
return code;
}
static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSchema *pTSchema) {
int32_t code = 0;
col_id_t kvIter = 0;
col_id_t nKvCols = tdRowGetNCols(pRow) - 1;
void *pColIdx = TD_ROW_COL_IDX(pRow);
void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow));
int32_t iTColumn = 1;
STColumn *pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData];
while (pTColumn && pTColumn->colId < pColData->cid) {
iTColumn++;
pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
}
if (pTColumn == NULL || pTColumn->colId > pColData->cid) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
} else {
ASSERT(pTColumn->type == pColData->type);
SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type};
TDRowValT vt = TD_VTYPE_NONE; // default is NONE
SKvRowIdx *pKvIdx = NULL;
while (kvIter < nKvCols) {
pKvIdx = (SKvRowIdx *)POINTER_SHIFT(pColIdx, kvIter * sizeof(SKvRowIdx));
if (pKvIdx->colId == pTColumn->colId) {
tdGetBitmapValTypeII(pBitmap, kvIter, &vt);
++kvIter;
break;
} else if (pKvIdx->colId > pTColumn->colId) {
vt = TD_VTYPE_NONE;
break;
} else {
++kvIter;
}
}
if (vt == TD_VTYPE_NORM) {
cv.flag = CV_FLAG_VALUE;
void *pData = POINTER_SHIFT(pRow, pKvIdx->offset);
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
memcpy(&cv.value.val, pData, pTColumn->bytes);
}
code = tColDataAppendValue(pColData, &cv);
if (code) goto _exit;
} else if (vt == TD_VTYPE_NONE) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
} else if (vt == TD_VTYPE_NULL) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pColData->cid, pColData->type));
if (code) goto _exit;
} else {
ASSERT(0);
}
iTColumn++;
pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
}
}
_exit:
return code;
}
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
int32_t code = 0; int32_t code = 0;
...@@ -1036,27 +1221,20 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -1036,27 +1221,20 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
if (code) goto _err; if (code) goto _err;
pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
// OTHER SColVal cv = {0};
SRowIter rIter = {0}; if (pRow->type == 0) {
SColVal *pColVal; if (TD_IS_TP_ROW(pRow->pTSRow)) {
code = tBlockDataAppendTPRow(pBlockData, pRow->pTSRow, pTSchema);
tRowIterInit(&rIter, pRow, pTSchema);
pColVal = tRowIterNext(&rIter);
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData];
while (pColVal && pColVal->cid < pColData->cid) {
pColVal = tRowIterNext(&rIter);
}
if (pColVal == NULL || pColVal->cid > pColData->cid) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err; if (code) goto _err;
} else { } else if (TD_IS_KV_ROW(pRow->pTSRow)) {
code = tColDataAppendValue(pColData, pColVal); code = tBlockDataAppendKVRow(pBlockData, pRow->pTSRow, pTSchema);
if (code) goto _err; if (code) goto _err;
pColVal = tRowIterNext(&rIter); } else {
ASSERT(0);
} }
} else {
code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow);
if (code) goto _err;
} }
pBlockData->nRow++; pBlockData->nRow++;
......
...@@ -27,10 +27,21 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) ...@@ -27,10 +27,21 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
return -1; return -1;
} }
if (taosThreadSpinInit(&pPool->lock, 0) != 0) { if (VND_IS_RSMA(pVnode)) {
taosMemoryFree(pPool); pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock));
terrno = TAOS_SYSTEM_ERROR(errno); if (!pPool->lock) {
return -1; taosMemoryFree(pPool);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (taosThreadSpinInit(pPool->lock, 0) != 0) {
taosMemoryFree((void*)pPool->lock);
taosMemoryFree(pPool);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
} else {
pPool->lock = NULL;
} }
pPool->next = NULL; pPool->next = NULL;
...@@ -49,7 +60,10 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) ...@@ -49,7 +60,10 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
static int vnodeBufPoolDestroy(SVBufPool *pPool) { static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool); vnodeBufPoolReset(pPool);
taosThreadSpinDestroy(&pPool->lock); if (pPool->lock) {
taosThreadSpinDestroy(pPool->lock);
taosMemoryFree((void*)pPool->lock);
}
taosMemoryFree(pPool); taosMemoryFree(pPool);
return 0; return 0;
} }
...@@ -114,7 +128,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { ...@@ -114,7 +128,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
void *p = NULL; void *p = NULL;
ASSERT(pPool != NULL); ASSERT(pPool != NULL);
taosThreadSpinLock(&pPool->lock); if (pPool->lock) taosThreadSpinLock(pPool->lock);
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
// allocate from the anchor node // allocate from the anchor node
p = pPool->ptr; p = pPool->ptr;
...@@ -125,7 +139,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { ...@@ -125,7 +139,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
pNode = taosMemoryMalloc(sizeof(*pNode) + size); pNode = taosMemoryMalloc(sizeof(*pNode) + size);
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosThreadSpinUnlock(&pPool->lock); if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
return NULL; return NULL;
} }
...@@ -138,7 +152,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { ...@@ -138,7 +152,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
pPool->size = pPool->size + sizeof(*pNode) + size; pPool->size = pPool->size + sizeof(*pNode) + size;
} }
taosThreadSpinUnlock(&pPool->lock); if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
return p; return p;
} }
......
...@@ -981,6 +981,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo); ...@@ -981,6 +981,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
/* /*
......
...@@ -91,9 +91,6 @@ static void destroyAggOperatorInfo(void* param); ...@@ -91,9 +91,6 @@ static void destroyAggOperatorInfo(void* param);
static void destroyIntervalOperatorInfo(void* param); static void destroyIntervalOperatorInfo(void* param);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void setOperatorCompleted(SOperatorInfo* pOperator) { void setOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
ASSERT(pOperator->pTaskInfo != NULL); ASSERT(pOperator->pTaskInfo != NULL);
...@@ -2172,7 +2169,7 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { ...@@ -2172,7 +2169,7 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
} }
} }
static void destroyOperatorInfo(SOperatorInfo* pOperator) { void destroyOperatorInfo(SOperatorInfo* pOperator) {
if (pOperator == NULL) { if (pOperator == NULL) {
return; return;
} }
...@@ -3426,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta ...@@ -3426,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
ASSERT(code == 0); ASSERT(code == 0);
if (code == -1) { if (code == -1) {
// coverity scan // coverity scan
pGroupResInfo->index += 1;
continue; continue;
} }
SResultRow* pRow = (SResultRow*)pVal; SResultRow* pRow = (SResultRow*)pVal;
......
...@@ -816,12 +816,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -816,12 +816,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error; goto _error;
} }
setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -900,7 +900,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ...@@ -900,7 +900,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
// TODO check tbname validity // TODO check tbname validity
if (pData != (void*)-1) { if (pData != (void*)-1) {
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN); int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(pDest->info.parTbName, varDataVal(pData), len); memcpy(pDest->info.parTbName, varDataVal(pData), len);
/*pDest->info.parTbName[len + 1] = 0;*/ /*pDest->info.parTbName[len + 1] = 0;*/
} else { } else {
...@@ -1099,11 +1099,12 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1099,11 +1099,12 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, pOperator->fpSet =
destroyStreamPartitionOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
......
...@@ -946,7 +946,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -946,7 +946,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->currentGroupId = -1; pInfo->currentGroupId = -1;
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
...@@ -980,7 +981,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* ...@@ -980,7 +981,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pInfo->dataReader = pReadHandle; pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1; // pInfo->prevGroupId = -1;
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
return pOperator; return pOperator;
} }
...@@ -1136,8 +1138,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -1136,8 +1138,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
goto _error; goto _error;
} }
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL); OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
...@@ -1581,7 +1585,7 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) { ...@@ -1581,7 +1585,7 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
// TODO check tbname validation // TODO check tbname validation
if (pData != (void*)-1 && pData != NULL) { if (pData != (void*)-1 && pData != NULL) {
memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN); int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(pBlock->info.parTbName, varDataVal(pData), len); memcpy(pBlock->info.parTbName, varDataVal(pData), len);
/*pBlock->info.parTbName[len + 1] = 0;*/ /*pBlock->info.parTbName[len + 1] = 0;*/
} else { } else {
...@@ -1769,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1769,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL; return NULL;
} }
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1); ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
...@@ -2351,7 +2356,8 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT ...@@ -2351,7 +2356,8 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->vnode = pHandle->vnode; pInfo->vnode = pHandle->vnode;
pInfo->sContext = pHandle->sContext; pInfo->sContext = pHandle->sContext;
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
return pOperator; return pOperator;
...@@ -2366,9 +2372,7 @@ _end: ...@@ -2366,9 +2372,7 @@ _end:
static void destroyStreamScanOperatorInfo(void* param) { static void destroyStreamScanOperatorInfo(void* param) {
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; destroyOperatorInfo(pStreamScan->pTableScanOp);
destroyTableScanOperatorInfo(pTableScanInfo);
taosMemoryFreeClear(pStreamScan->pTableScanOp);
} }
if (pStreamScan->tqReader) { if (pStreamScan->tqReader) {
tqCloseReader(pStreamScan->tqReader); tqCloseReader(pStreamScan->tqReader);
...@@ -2537,7 +2541,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2537,7 +2541,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->partitionSup.needCalc = false; pInfo->partitionSup.needCalc = false;
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
...@@ -4175,7 +4180,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -4175,7 +4180,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->readHandle = *(SReadHandle*)readHandle; pInfo->readHandle = *(SReadHandle*)readHandle;
} }
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL);
return pOperator; return pOperator;
...@@ -4305,7 +4311,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -4305,7 +4311,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
...@@ -4815,11 +4822,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -4815,11 +4822,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); getTableMergeScanExplainExecInfo);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
......
...@@ -680,9 +680,9 @@ SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) { ...@@ -680,9 +680,9 @@ SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) {
void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) { void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) {
for (int32_t i = start; i < end; i++) { for (int32_t i = start; i < end; i++) {
destroyExprInfo(pFillCol[i].pExpr, 1); destroyExprInfo(pFillCol[i].pExpr, 1);
taosMemoryFreeClear(pFillCol[i].pExpr);
taosVariantDestroy(&pFillCol[i].fillVal); taosVariantDestroy(&pFillCol[i].fillVal);
} }
taosMemoryFreeClear(pFillCol[start].pExpr);
taosMemoryFree(pFillCol); taosMemoryFree(pFillCol);
return NULL; return NULL;
} }
......
...@@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio ...@@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
} }
static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
*pHashKey = *pKey;
pHashKey->win.ekey = pKey->win.skey;
}
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
if (tSimpleHashGetSize(pHashMap) == 0) { if (tSimpleHashGetSize(pHashMap) == 0) {
return; return;
...@@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { ...@@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SSessionKey* pWin = taosArrayGet(pWins, i); SSessionKey* pWin = taosArrayGet(pWins, i);
if (!pWin) continue; if (!pWin) continue;
SSessionKey key = *pWin; SSessionKey key = {0};
key.win.ekey = key.win.skey; getSessionHashKey(pWin, &key);
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
} }
} }
...@@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo ...@@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo
static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) { static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
streamStateSessionDel(pAggSup->pState, pKey); streamStateSessionDel(pAggSup->pState, pKey);
tSimpleHashRemove(pAggSup->pResultRows, pKey, sizeof(SSessionKey)); SSessionKey hashKey = {0};
getSessionHashKey(pKey, &hashKey);
tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
return true; return true;
} }
...@@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SSessionKey key = winInfo.sessionWin; SSessionKey key = {0};
key.win.ekey = key.win.skey; getSessionHashKey(&winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
} }
...@@ -3853,23 +3860,28 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo ...@@ -3853,23 +3860,28 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
colDataAppendNULL(pCalEdCol, pBlock->info.rows); colDataAppendNULL(pCalEdCol, pBlock->info.rows);
SHashObj* pGroupIdTbNameMap = NULL; SHashObj* pGroupIdTbNameMap = NULL;
if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOp->info; SStreamSessionAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOp->info; SStreamStateAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else {
ASSERT(0);
} }
char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t)); char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t));
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
if (tbname == NULL) { if (tbname == NULL) {
/*printf("\n\n no tbname for group id %" PRId64 "%p %p\n\n", res->groupId, pOp->info, pGroupIdTbNameMap);*/
colDataAppendNULL(pTableCol, pBlock->info.rows); colDataAppendNULL(pTableCol, pBlock->info.rows);
} else { } else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
/*printf("\n\n get tbname %s group id %" PRId64 "\n\n", tbname, res->groupId);*/
} }
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
...@@ -3896,8 +3908,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS ...@@ -3896,8 +3908,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup;
SSessionKey chWinKey = *pWinKey; SSessionKey chWinKey = {0};
chWinKey.win.ekey = chWinKey.win.skey; getSessionHashKey(pWinKey, &chWinKey);
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
SResultRow* pChResult = NULL; SResultRow* pChResult = NULL;
...@@ -3978,8 +3990,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) { ...@@ -3978,8 +3990,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SSessionKey* pWinKey = taosArrayGet(pResWins, i); SSessionKey* pWinKey = taosArrayGet(pResWins, i);
if (!pWinKey) continue; if (!pWinKey) continue;
SSessionKey winInfo = *pWinKey; SSessionKey winInfo = {0};
winInfo.win.ekey = winInfo.win.skey; getSessionHashKey(pWinKey, &winInfo);
tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0); tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0);
} }
} }
...@@ -4046,7 +4058,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4046,7 +4058,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock->info.parTbName[0]) { if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN); TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ /*printf("\n\n put tbname %s group id %" PRId64 "\n\n into %p %p", pBlock->info.parTbName, pBlock->info.groupId,*/
/*pInfo, pInfo->pGroupIdTbNameMap);*/
} }
if (pBlock->info.parTbName[0]) { if (pBlock->info.parTbName[0]) {
...@@ -4561,8 +4574,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4561,8 +4574,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SSessionKey key = curWin.winInfo.sessionWin; SSessionKey key = {0};
key.win.ekey = key.win.skey; getSessionHashKey(&curWin.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
} }
} }
...@@ -4645,6 +4658,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4645,6 +4658,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
#if 0
char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
qDebug("===stream===final session%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete"); printDataBlock(pInfo->pDelRes, "single state delete");
......
...@@ -1878,6 +1878,7 @@ static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) { ...@@ -1878,6 +1878,7 @@ static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) {
static EDealRes translateCaseWhen(STranslateContext* pCxt, SCaseWhenNode* pCaseWhen) { static EDealRes translateCaseWhen(STranslateContext* pCxt, SCaseWhenNode* pCaseWhen) {
bool first = true; bool first = true;
bool allNullThen = true;
SNode* pNode = NULL; SNode* pNode = NULL;
FOREACH(pNode, pCaseWhen->pWhenThenList) { FOREACH(pNode, pCaseWhen->pWhenThenList) {
SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode; SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode;
...@@ -1889,12 +1890,28 @@ static EDealRes translateCaseWhen(STranslateContext* pCxt, SCaseWhenNode* pCaseW ...@@ -1889,12 +1890,28 @@ static EDealRes translateCaseWhen(STranslateContext* pCxt, SCaseWhenNode* pCaseW
} }
pWhenThen->pWhen = pIsTrue; pWhenThen->pWhen = pIsTrue;
} }
if (first || dataTypeComp(&pCaseWhen->node.resType, &((SExprNode*)pNode)->resType) < 0) {
pCaseWhen->node.resType = ((SExprNode*)pNode)->resType; SExprNode* pThenExpr = (SExprNode*)pNode;
if (TSDB_DATA_TYPE_NULL == pThenExpr->resType.type) {
continue;
}
allNullThen = false;
if (first || dataTypeComp(&pCaseWhen->node.resType, &pThenExpr->resType) < 0) {
pCaseWhen->node.resType = pThenExpr->resType;
} }
first = false; first = false;
} }
if (allNullThen) {
if (NULL != pCaseWhen->pElse) {
pCaseWhen->node.resType = ((SExprNode*)pCaseWhen->pElse)->resType;
} else {
pCaseWhen->node.resType.type = TSDB_DATA_TYPE_NULL;
pCaseWhen->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
return DEAL_RES_CONTINUE;
}
}
FOREACH(pNode, pCaseWhen->pWhenThenList) { FOREACH(pNode, pCaseWhen->pWhenThenList) {
SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode; SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode;
if (!dataTypeEqual(&pCaseWhen->node.resType, &((SExprNode*)pNode)->resType)) { if (!dataTypeEqual(&pCaseWhen->node.resType, &((SExprNode*)pNode)->resType)) {
......
...@@ -138,7 +138,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -138,7 +138,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY: case TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY:
return "Primary timestamp column cannot be dropped"; return "Primary timestamp column cannot be dropped";
case TSDB_CODE_PAR_INVALID_MODIFY_COL: case TSDB_CODE_PAR_INVALID_MODIFY_COL:
return "Only binary/nchar column length could be modified"; return "Only binary/nchar column length could be modified, and the length can only be increased, not decreased";
case TSDB_CODE_PAR_INVALID_TBNAME: case TSDB_CODE_PAR_INVALID_TBNAME:
return "Invalid tbname pseudo column"; return "Invalid tbname pseudo column";
case TSDB_CODE_PAR_INVALID_FUNCTION_NAME: case TSDB_CODE_PAR_INVALID_FUNCTION_NAME:
......
...@@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa ...@@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
void* tmp = NULL; void* tmp = NULL;
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
if (code == 0) { if (code == 0) {
*key = resKey; if (key->win.skey != resKey.win.skey) {
*pVal = tdbRealloc(NULL, *pVLen); code = -1;
memcpy(*pVal, tmp, *pVLen); } else {
*key = resKey;
*pVal = tdbRealloc(NULL, *pVLen);
memcpy(*pVal, tmp, *pVLen);
}
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return code; return code;
......
...@@ -96,7 +96,7 @@ typedef void* queue[2]; ...@@ -96,7 +96,7 @@ typedef void* queue[2];
//#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit //#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
//#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) //#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 #define TRANS_PACKET_LIMIT 1024 * 1024 * 512
......
...@@ -25,7 +25,8 @@ typedef struct SCliConn { ...@@ -25,7 +25,8 @@ typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
queue wreqQueue; queue wreqQueue;
uv_timer_t* timer;
uv_timer_t* timer; // read timer, forbidden
void* hostThrd; void* hostThrd;
...@@ -79,6 +80,7 @@ typedef struct SCliThrd { ...@@ -79,6 +80,7 @@ typedef struct SCliThrd {
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
void* pTransInst; // void* pTransInst; //
void (*destroyAhandleFp)(void* ahandle);
SHashObj* fqdn2ipCache; SHashObj* fqdn2ipCache;
SCvtAddr cvtAddr; SCvtAddr cvtAddr;
...@@ -102,6 +104,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); ...@@ -102,6 +104,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
// register timer for read // register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle); static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
...@@ -155,6 +159,7 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, ...@@ -155,6 +159,7 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq,
static FORCE_INLINE void destroyUserdata(STransMsg* userdata); static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg); static FORCE_INLINE void destroyCmsg(void* cmsg);
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
...@@ -258,16 +263,15 @@ static void* cliWorkThread(void* arg); ...@@ -258,16 +263,15 @@ static void* cliWorkThread(void* arg);
static void cliReleaseUnfinishedMsg(SCliConn* conn) { static void cliReleaseUnfinishedMsg(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
conn->ctx.freeFunc(msg->ctx->ahandle); conn->ctx.freeFunc(msg->ctx->ahandle);
} else if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) { } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle); tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
pTransInst->destroyFp(msg->ctx->ahandle); pThrd->destroyAhandleFp(msg->ctx->ahandle);
} }
} }
destroyCmsg(msg); destroyCmsg(msg);
...@@ -407,9 +411,16 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { ...@@ -407,9 +411,16 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
bool once = false; bool once = false;
do { do {
SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
if (pMsg == NULL && once) { if (pMsg == NULL && once) {
break; break;
} }
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
destroyCmsg(pMsg);
break;
}
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0}; STransMsg transMsg = {0};
...@@ -439,6 +450,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { ...@@ -439,6 +450,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
continue; continue;
} }
} }
if (pMsg == NULL || (pMsg && pMsg->type != Release)) { if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
if (cliAppCb(pConn, &transMsg, pMsg) != 0) { if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
return; return;
...@@ -454,6 +466,19 @@ void cliHandleExcept(SCliConn* conn) { ...@@ -454,6 +466,19 @@ void cliHandleExcept(SCliConn* conn) {
cliHandleExceptImpl(conn, -1); cliHandleExceptImpl(conn, -1);
} }
void cliConnTimeout(uv_timer_t* handle) {
SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
uv_timer_stop(handle);
handle->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleExceptImpl(conn, -1);
}
void cliReadTimeoutCb(uv_timer_t* handle) { void cliReadTimeoutCb(uv_timer_t* handle) {
// set up timeout cb // set up timeout cb
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
...@@ -545,7 +570,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -545,7 +570,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->list->size >= 50) { if (conn->list->size >= 50) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn; arg->param1 = conn;
arg->param2 = NULL; arg->param2 = thrd;
STrans* pTransInst = thrd->pTransInst; STrans* pTransInst = thrd->pTransInst;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
...@@ -630,8 +655,16 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { ...@@ -630,8 +655,16 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn; conn->stream->data = conn;
conn->connReq.data = conn; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) {
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
tDebug("no available timer, create a timer %p", timer);
uv_timer_init(pThrd->loop, timer);
}
timer->data = conn;
conn->timer = timer;
conn->connReq.data = conn;
transReqQueueInit(&conn->wreqQueue); transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL); transQueueInit(&conn->cliMsgs, NULL);
...@@ -661,8 +694,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -661,8 +694,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
} }
if (conn->timer != NULL) { if (conn->timer != NULL) {
uv_timer_stop(conn->timer); uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL; conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL; conn->timer = NULL;
} }
...@@ -811,6 +844,15 @@ _RETURN: ...@@ -811,6 +844,15 @@ _RETURN:
void cliConnCb(uv_connect_t* req, int status) { void cliConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
SCliThrd* pThrd = pConn->hostThrd;
if (pConn->timer != NULL) {
uv_timer_stop(pConn->timer);
pConn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &pConn->timer);
pConn->timer = NULL;
}
if (status != 0) { if (status != 0) {
tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
cliHandleExcept(pConn); cliHandleExcept(pConn);
...@@ -989,31 +1031,26 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -989,31 +1031,26 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) {
tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret));
}
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT);
if (fd == -1) {
tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn);
cliHandleExcept(conn);
return;
}
uv_tcp_open((uv_tcp_t*)conn->stream, fd);
struct sockaddr_in addr; struct sockaddr_in addr;
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
addr.sin_port = (uint16_t)htons((uint16_t)conn->port); addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret)); uv_err_name(ret));
uv_timer_stop(conn->timer);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleExcept(conn); cliHandleExcept(conn);
return; return;
} }
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
} }
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s conn %p ready", pTransInst->label, conn); tGTrace("%s conn %p ready", pTransInst->label, conn);
...@@ -1136,6 +1173,8 @@ static void* cliWorkThread(void* arg) { ...@@ -1136,6 +1173,8 @@ static void* cliWorkThread(void* arg) {
pThrd->pid = taosGetSelfPthreadId(); pThrd->pid = taosGetSelfPthreadId();
setThreadName("trans-cli-work"); setThreadName("trans-cli-work");
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
return NULL; return NULL;
} }
...@@ -1177,6 +1216,25 @@ static FORCE_INLINE void destroyCmsg(void* arg) { ...@@ -1177,6 +1216,25 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
taosMemoryFree(pMsg); taosMemoryFree(pMsg);
} }
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
if (param == NULL) return;
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
tDebug("destroy Ahandle A");
if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
tDebug("destroy Ahandle B");
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
}
tDebug("destroy Ahandle C");
transDestroyConnCtx(pMsg->ctx);
destroyUserdata(&pMsg->msg);
taosMemoryFree(pMsg);
}
static SCliThrd* createThrdObj(void* trans) { static SCliThrd* createThrdObj(void* trans) {
STrans* pTransInst = trans; STrans* pTransInst = trans;
...@@ -1195,7 +1253,7 @@ static SCliThrd* createThrdObj(void* trans) { ...@@ -1195,7 +1253,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->prepare->data = pThrd; pThrd->prepare->data = pThrd;
// uv_prepare_start(pThrd->prepare, cliPrepareCb); // uv_prepare_start(pThrd->prepare, cliPrepareCb);
int32_t timerSize = 512; int32_t timerSize = 64;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
for (int i = 0; i < timerSize; i++) { for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
...@@ -1211,6 +1269,7 @@ static SCliThrd* createThrdObj(void* trans) { ...@@ -1211,6 +1269,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
pThrd->pTransInst = trans; pThrd->pTransInst = trans;
pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false; pThrd->quit = false;
return pThrd; return pThrd;
...@@ -1226,9 +1285,10 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1226,9 +1285,10 @@ static void destroyThrdObj(SCliThrd* pThrd) {
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsg); transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
transDQDestroy(pThrd->timeoutQueue, NULL); transDQDestroy(pThrd->timeoutQueue, NULL);
tDebug("thread destroy %" PRId64, pThrd->pid);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i); uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
taosMemoryFree(timer); taosMemoryFree(timer);
...@@ -1254,7 +1314,18 @@ void cliSendQuit(SCliThrd* thrd) { ...@@ -1254,7 +1314,18 @@ void cliSendQuit(SCliThrd* thrd) {
} }
void cliWalkCb(uv_handle_t* handle, void* arg) { void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
uv_read_stop((uv_stream_t*)handle); if (uv_handle_get_type(handle) == UV_TIMER) {
// SCliConn* pConn = handle->data;
// if (pConn != NULL && pConn->timer != NULL) {
// SCliThrd* pThrd = pConn->hostThrd;
// uv_timer_stop((uv_timer_t*)handle);
// handle->data = NULL;
// taosArrayPush(pThrd->timerList, &pConn->timer);
// pConn->timer = NULL;
// }
} else {
uv_read_stop((uv_stream_t*)handle);
}
uv_close(handle, cliDestroy); uv_close(handle, cliDestroy);
} }
} }
......
...@@ -497,7 +497,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { ...@@ -497,7 +497,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
SDelayTask* task = container_of(minNode, SDelayTask, node); SDelayTask* task = container_of(minNode, SDelayTask, node);
STaskArg* arg = task->arg; STaskArg* arg = task->arg;
if (freeFunc) freeFunc(arg->param1); if (freeFunc) freeFunc(arg);
taosMemoryFree(arg); taosMemoryFree(arg);
taosMemoryFree(task); taosMemoryFree(task);
......
...@@ -902,9 +902,11 @@ void taosSetCoreDump(bool enable) { ...@@ -902,9 +902,11 @@ void taosSetCoreDump(bool enable) {
old_len = sizeof(old_usespid); old_len = sizeof(old_usespid);
#ifndef __loongarch64
if (syscall(SYS__sysctl, &args) == -1) { if (syscall(SYS__sysctl, &args) == -1) {
// printf("_sysctl(kern_core_uses_pid) set fail: %s", strerror(errno)); // printf("_sysctl(kern_core_uses_pid) set fail: %s", strerror(errno));
} }
#endif
// printf("The old core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid); // printf("The old core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid);
...@@ -918,9 +920,11 @@ void taosSetCoreDump(bool enable) { ...@@ -918,9 +920,11 @@ void taosSetCoreDump(bool enable) {
old_len = sizeof(old_usespid); old_len = sizeof(old_usespid);
#ifndef __loongarch64
if (syscall(SYS__sysctl, &args) == -1) { if (syscall(SYS__sysctl, &args) == -1) {
// printf("_sysctl(kern_core_uses_pid) get fail: %s", strerror(errno)); // printf("_sysctl(kern_core_uses_pid) get fail: %s", strerror(errno));
} }
#endif
// printf("The new core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid); // printf("The new core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid);
#endif #endif
...@@ -989,4 +993,4 @@ bool taosCheckCurrentInDll() { ...@@ -989,4 +993,4 @@ bool taosCheckCurrentInDll() {
#else #else
return false; return false;
#endif #endif
} }
\ No newline at end of file
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) #if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_)
#include <nmmintrin.h> #include <nmmintrin.h>
#endif #endif
...@@ -512,7 +512,7 @@ static uint32_t table[16][256] = { ...@@ -512,7 +512,7 @@ static uint32_t table[16][256] = {
0x9c221d09, 0x6e2e10f7, 0x7dd67004, 0x8fda7dfa} 0x9c221d09, 0x6e2e10f7, 0x7dd67004, 0x8fda7dfa}
}; };
#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) #if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_)
static uint32_t long_shifts[4][256] = { static uint32_t long_shifts[4][256] = {
{0x00000000, 0xe040e0ac, 0xc56db7a9, 0x252d5705, 0x8f3719a3, 0x6f77f90f, 0x4a5aae0a, 0xaa1a4ea6, 0x1b8245b7, {0x00000000, 0xe040e0ac, 0xc56db7a9, 0x252d5705, 0x8f3719a3, 0x6f77f90f, 0x4a5aae0a, 0xaa1a4ea6, 0x1b8245b7,
0xfbc2a51b, 0xdeeff21e, 0x3eaf12b2, 0x94b55c14, 0x74f5bcb8, 0x51d8ebbd, 0xb1980b11, 0x37048b6e, 0xd7446bc2, 0xfbc2a51b, 0xdeeff21e, 0x3eaf12b2, 0x94b55c14, 0x74f5bcb8, 0x51d8ebbd, 0xb1980b11, 0x37048b6e, 0xd7446bc2,
...@@ -846,7 +846,7 @@ uint32_t crc32c_sf(uint32_t crci, crc_stream input, size_t length) { ...@@ -846,7 +846,7 @@ uint32_t crc32c_sf(uint32_t crci, crc_stream input, size_t length) {
} }
return (uint32_t)crc ^ 0xffffffff; return (uint32_t)crc ^ 0xffffffff;
} }
#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) #if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_)
/* Apply the zeros operator table to crc. */ /* Apply the zeros operator table to crc. */
static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) { static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) {
return shift_table[0][crc & 0xff] ^ shift_table[1][(crc >> 8) & 0xff] ^ shift_table[2][(crc >> 16) & 0xff] ^ return shift_table[0][crc & 0xff] ^ shift_table[1][(crc >> 8) & 0xff] ^ shift_table[2][(crc >> 16) & 0xff] ^
...@@ -857,7 +857,7 @@ static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) { ...@@ -857,7 +857,7 @@ static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) {
version. Otherwise, use the software version. */ version. Otherwise, use the software version. */
uint32_t (*crc32c)(uint32_t crci, crc_stream bytes, size_t len) = crc32c_sf; uint32_t (*crc32c)(uint32_t crci, crc_stream bytes, size_t len) = crc32c_sf;
#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) #if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_)
/* Compute CRC-32C using the Intel hardware instruction. */ /* Compute CRC-32C using the Intel hardware instruction. */
uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) { uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) {
crc_stream next = buf; crc_stream next = buf;
...@@ -1012,7 +1012,7 @@ uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) { ...@@ -1012,7 +1012,7 @@ uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) {
#endif // #ifndef _TD_ARM_ #endif // #ifndef _TD_ARM_
void taosResolveCRC() { void taosResolveCRC() {
#if defined _TD_ARM_ || defined _TD_MIPS_ || defined WINDOWS #if defined _TD_ARM_ || defined _TD_MIPS_ || defined WINDOWS || defined _TD_LOONGARCH_
crc32c = crc32c_sf; crc32c = crc32c_sf;
#else #else
int32_t sse42; int32_t sse42;
......
...@@ -521,7 +521,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TIMELINE_FUNC, "Invalid timeline fu ...@@ -521,7 +521,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TIMELINE_FUNC, "Invalid timeline fu
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PASSWD, "Invalid password") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PASSWD, "Invalid password")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid alter table statement") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid alter table statement")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY, "Primary timestamp column cannot be dropped") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY, "Primary timestamp column cannot be dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified, and the length can only be increased, not decreased")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TBNAME, "Invalid tbname pseudo column") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TBNAME, "Invalid tbname pseudo column")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FUNCTION_NAME, "Invalid function name") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FUNCTION_NAME, "Invalid function name")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COMMENT_TOO_LONG, "Comment too long") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COMMENT_TOO_LONG, "Comment too long")
......
...@@ -435,7 +435,7 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) { ...@@ -435,7 +435,7 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
taosGetTimeOfDay(&timeSecs); taosGetTimeOfDay(&timeSecs);
time_t curTime = timeSecs.tv_sec; time_t curTime = timeSecs.tv_sec;
ptm = taosLocalTimeNolock(&Tm, &curTime, taosGetDaylight()); ptm = taosLocalTime(&curTime, &Tm);
return sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " %s", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, return sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " %s", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId(), flags); ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId(), flags);
......
...@@ -714,6 +714,7 @@ ...@@ -714,6 +714,7 @@
,,,system-test,python3 ./test.py -f 7-tmq/tmq_taosx.py ,,,system-test,python3 ./test.py -f 7-tmq/tmq_taosx.py
,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,,system-test,python3 ./test.py -f 99-TDcase/TD-19201.py ,,,system-test,python3 ./test.py -f 99-TDcase/TD-19201.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
,,,system-test,python3 ./test.py -f 2-query/between.py -Q 2 ,,,system-test,python3 ./test.py -f 2-query/between.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/distinct.py -Q 2 ,,,system-test,python3 ./test.py -f 2-query/distinct.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/varchar.py -Q 2 ,,,system-test,python3 ./test.py -f 2-query/varchar.py -Q 2
......
...@@ -216,6 +216,60 @@ sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDeng ...@@ -216,6 +216,60 @@ sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDeng
print ========== step6 repeat print ========== step6 repeat
sql drop database test; sql drop database test;
print ========== interval\session\state window
sql CREATE DATABASE test1 BUFFER 96 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 STRICT 'off' WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0;
sql use test1;
sql CREATE STABLE st (time TIMESTAMP, ca DOUBLE, cb DOUBLE, cc int) TAGS (ta VARCHAR(10) );
print ========== create table before stream
sql CREATE TABLE t1 using st TAGS ('aaa');
sql CREATE TABLE t2 using st TAGS ('bbb');
sql CREATE TABLE t3 using st TAGS ('ccc');
sql CREATE TABLE t4 using st TAGS ('ddd');
print ========== stable
sql create stream streamd1 into streamt1 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(null);
sql create stream streamd2 into streamt2 as select ca, _wstart,_wend, count(*), max(ca), max(cb) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(linear);
sql create stream streamd3 into streamt3 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca session(time, 60m);
sql create stream streamd4 into streamt4 as select ta, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta session(time, 60m);
sql_error create stream streamd5 into streamt5 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca state_window(cc);
sql_error create stream streamd6 into streamt6 as select ta, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta state_window(cc);
print ========== table
sql create stream streamd7 into streamt7 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(null);
sql create stream streamd8 into streamt8 as select ca, _wstart,_wend, count(*), max(ca), max(cb) from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(linear);
sql create stream streamd9 into streamt9 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca session(time, 60m);
sql create stream streamd10 into streamt10 as select ta, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta session(time, 60m);
sql create stream streamd11 into streamt11 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca state_window(cc);
sql create stream streamd12 into streamt12 as select ta, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta state_window(cc);
print ========== create table after stream
sql CREATE TABLE t5 using st TAGS ('eee');
sql CREATE TABLE t6 using st TAGS ('fff');
sql CREATE TABLE t7 using st TAGS ('ggg');
sql CREATE TABLE t8 using st TAGS ('fff');
sleep 1000
print ========== drop stream
sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
sql drop stream if exists streamd3;
sql drop stream if exists streamd4;
#sql drop stream if exists streamd5;
#sql drop stream if exists streamd6;
sql drop stream if exists streamd7;
sql drop stream if exists streamd8;
sql drop stream if exists streamd9;
sql drop stream if exists streamd10;
sql drop stream if exists streamd11;
sql drop stream if exists streamd12;
print ========== step7
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT
......
...@@ -544,4 +544,192 @@ if $rows != 10 then ...@@ -544,4 +544,192 @@ if $rows != 10 then
endi endi
sql drop stream if exists streams4;
sql drop database if exists test4;
sql drop stable if exists streamt4;
sql create database if not exists test4 vgroups 10 precision "ms" ;
sql use test4;
sql create table st (ts timestamp, c1 tinyint, c2 smallint) tags (t1 tinyint) ;
sql create table t1 using st tags (-81) ;
sql create table t2 using st tags (-81) ;
sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS start, min(c1),count(c1) from t1 state_window(c1);
sql insert into t1 (ts, c1) values (1668073288209, 11);
sql insert into t1 (ts, c1) values (1668073288210, 11);
sql insert into t1 (ts, c1) values (1668073288211, 11);
sql insert into t1 (ts, c1) values (1668073288212, 11);
sql insert into t1 (ts, c1) values (1668073288213, 11);
sql insert into t1 (ts, c1) values (1668073288214, 11);
sql insert into t1 (ts, c1) values (1668073288215, 29);
$loop_count = 0
loop7:
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop7
endi
if $data01 != 11 then
print =====data01=$data01
goto loop7
endi
if $data02 != 6 then
print =====data02=$data02
goto loop7
endi
sql delete from t1 where ts = cast(1668073288214 as timestamp);
sql insert into t1 (ts, c1) values (1668073288216, 29);
sql delete from t1 where ts = cast(1668073288215 as timestamp);
sql insert into t1 (ts, c1) values (1668073288217, 29);
sql delete from t1 where ts = cast(1668073288216 as timestamp);
sql insert into t1 (ts, c1) values (1668073288218, 29);
sql delete from t1 where ts = cast(1668073288217 as timestamp);
sql insert into t1 (ts, c1) values (1668073288219, 29);
sql delete from t1 where ts = cast(1668073288218 as timestamp);
sql insert into t1 (ts, c1) values (1668073288220, 29);
sql delete from t1 where ts = cast(1668073288219 as timestamp);
$loop_count = 0
loop8:
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop8
endi
if $data01 != 11 then
print =====data01=$data01
goto loop8
endi
if $data02 != 5 then
print =====data02=$data02
goto loop8
endi
sql insert into t1 (ts, c1) values (1668073288221, 65);
sql insert into t1 (ts, c1) values (1668073288222, 65);
sql insert into t1 (ts, c1) values (1668073288223, 65);
sql insert into t1 (ts, c1) values (1668073288224, 65);
sql insert into t1 (ts, c1) values (1668073288225, 65);
sql insert into t1 (ts, c1) values (1668073288226, 65);
$loop_count = 0
loop8:
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop8
endi
if $data01 != 11 then
print =====data01=$data01
goto loop8
endi
if $data02 != 5 then
print =====data02=$data02
goto loop8
endi
if $data11 != 29 then
print =====data11=$data11
goto loop8
endi
if $data12 != 1 then
print =====data12=$data12
goto loop8
endi
sql insert into t1 (ts, c1) values (1668073288224, 64);
$loop_count = 0
loop9:
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop9
endi
if $data01 != 11 then
print =====data01=$data01
goto loop9
endi
if $data02 != 5 then
print =====data02=$data02
goto loop9
endi
if $data11 != 29 then
print =====data11=$data11
goto loop9
endi
if $data12 != 1 then
print =====data12=$data12
goto loop9
endi
if $data21 != 65 then
print =====data21=$data21
goto loop9
endi
if $data22 != 3 then
print =====data22=$data22
goto loop9
endi
if $data31 != 64 then
print =====data31=$data31
goto loop9
endi
if $data32 != 1 then
print =====data32=$data32
goto loop9
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -137,7 +137,7 @@ class TDTestCase: ...@@ -137,7 +137,7 @@ class TDTestCase:
tdLog.info("check show subscriptions") tdLog.info("check show subscriptions")
tdSql.query("show subscriptions") tdSql.query("show subscriptions")
# tdLog.debug(tdSql.queryResult) tdLog.debug(tdSql.queryResult)
rows = tdSql.getRows() rows = tdSql.getRows()
expectSubscriptions = paraDict['vgroups'] * len(topicNameList) expectSubscriptions = paraDict['vgroups'] * len(topicNameList)
tdLog.info("show subscriptions rows: %d, expect Subscriptions: %d"%(rows,expectSubscriptions)) tdLog.info("show subscriptions rows: %d, expect Subscriptions: %d"%(rows,expectSubscriptions))
......
from distutils.log import error
import taos
import sys
import time
import socket
import os import os
import threading
import subprocess
import platform import platform
import socket
import subprocess
import sys
import threading
import time
from distutils.log import error
from util.log import * import taos
from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import * from util.cluster import *
from util.common import * from util.common import *
from util.dnodes import *
from util.dnodes import TDDnode, TDDnodes
from util.log import *
from util.sql import *
sys.path.append("./6-cluster") sys.path.append("./6-cluster")
sys.path.append("./7-tmq") sys.path.append("./7-tmq")
from tmqCommon import *
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck from clusterCommonCheck import clusterComCheck
from clusterCommonCreate import *
from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self): def __init__(self):
...@@ -265,6 +266,7 @@ class TDTestCase: ...@@ -265,6 +266,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("================= restart dnode 2===========================") tdLog.info("================= restart dnode 2===========================")
cluster.dnodes[1].stoptaosd() cluster.dnodes[1].stoptaosd()
cluster.dnodes[1].starttaosd() cluster.dnodes[1].starttaosd()
......
...@@ -540,11 +540,20 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t ...@@ -540,11 +540,20 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
} }
} }
bool shellIsLimitQuery(const char *sql) { // show whole result for this query return true, like limit or describe
// todo refactor bool shellIsShowWhole(const char *sql) {
// limit
if (taosStrCaseStr(sql, " limit ") != NULL) { if (taosStrCaseStr(sql, " limit ") != NULL) {
return true; return true;
} }
// describe
if (taosStrCaseStr(sql, "describe ") != NULL) {
return true;
}
// show
if (taosStrCaseStr(sql, "show ") != NULL) {
return true;
}
return false; return false;
} }
...@@ -578,7 +587,7 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { ...@@ -578,7 +587,7 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
uint64_t resShowMaxNum = UINT64_MAX; uint64_t resShowMaxNum = UINT64_MAX;
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsShowWhole(sql)) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
} }
...@@ -723,7 +732,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { ...@@ -723,7 +732,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
uint64_t resShowMaxNum = UINT64_MAX; uint64_t resShowMaxNum = UINT64_MAX;
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsShowWhole(sql)) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
} }
......
...@@ -1097,18 +1097,12 @@ int sml_time_Test() { ...@@ -1097,18 +1097,12 @@ int sml_time_Test() {
pRes = taos_query(taos, "use sml_db"); pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes); taos_free_result(pRes);
char* tmp = (char*)taosMemoryCalloc(1024, 1); pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
memcpy(tmp, sql[0], strlen(sql[0]));
*(char*)(tmp+44) = 0;
int32_t totalRows = 0;
pRes = taos_schemaless_insert_raw(taos, tmp, strlen(sql[0]), &totalRows, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT(totalRows == 3);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes); int code = taos_errno(pRes);
taos_free_result(pRes); taos_free_result(pRes);
taos_close(taos); taos_close(taos);
taosMemoryFree(tmp);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册