未验证 提交 b1b4a180 编写于 作者: haoranc's avatar haoranc 提交者: GitHub

Merge branch '3.0' into test/chr/TD-14699

...@@ -111,7 +111,7 @@ TDengine REST API 详情请参考[官方文档](/reference/rest-api/)。 ...@@ -111,7 +111,7 @@ TDengine REST API 详情请参考[官方文档](/reference/rest-api/)。
这条命令很快完成 1 亿条记录的插入。具体时间取决于硬件性能。 这条命令很快完成 1 亿条记录的插入。具体时间取决于硬件性能。
taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,您可以设置不同参数进行体验,请执行 `taosBenchmark --help` 详细列出。taosBenchmark 详细使用方法请参照 [taosBenchmark 参考手册](../reference/taosbenchmark) taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,您可以设置不同参数进行体验,请执行 `taosBenchmark --help` 详细列出。taosBenchmark 详细使用方法请参照 [taosBenchmark 参考手册](../../reference/taosbenchmark)
## 体验查询 ## 体验查询
......
...@@ -245,7 +245,7 @@ select * from t; ...@@ -245,7 +245,7 @@ select * from t;
Query OK, 2 row(s) in set (0.003128s) Query OK, 2 row(s) in set (0.003128s)
``` ```
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/) 除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../../reference/taos-shell/)
## 使用 taosBenchmark 体验写入速度 ## 使用 taosBenchmark 体验写入速度
......
...@@ -218,7 +218,7 @@ GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些 ...@@ -218,7 +218,7 @@ GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些
PARTITION BY 子句是 TDengine 特色语法,按 part_list 对数据进行切分,在每个切分的分片中进行计算。 PARTITION BY 子句是 TDengine 特色语法,按 part_list 对数据进行切分,在每个切分的分片中进行计算。
详见 [TDengine 特色查询](./distinguished) 详见 [TDengine 特色查询](../distinguished)
## ORDER BY ## ORDER BY
......
此差异已折叠。
...@@ -19,7 +19,7 @@ library_path:包含UDF函数实现的动态链接库的绝对路径,是在 ...@@ -19,7 +19,7 @@ library_path:包含UDF函数实现的动态链接库的绝对路径,是在
OUTPUTTYPE:标识此函数的返回类型。 OUTPUTTYPE:标识此函数的返回类型。
BUFSIZE:中间结果的缓冲区大小,单位是字节。不设置则默认为0。最大不可超过512字节。 BUFSIZE:中间结果的缓冲区大小,单位是字节。不设置则默认为0。最大不可超过512字节。
关于如何开发自定义函数,请参考 [UDF使用说明](../develop/udf) 关于如何开发自定义函数,请参考 [UDF使用说明](../../develop/udf)
## 删除自定义函数 ## 删除自定义函数
......
...@@ -10,7 +10,7 @@ import TabItem from "@theme/TabItem"; ...@@ -10,7 +10,7 @@ import TabItem from "@theme/TabItem";
## 安装 ## 安装
关于安装,请参考 [使用安装包立即开始](../get-started/package) 关于安装,请参考 [使用安装包立即开始](../../get-started/package)
......
...@@ -16,7 +16,7 @@ title: 容量规划 ...@@ -16,7 +16,7 @@ title: 容量规划
- pagesize - pagesize
- cachesize - cachesize
关于这些参数的详细说明请参考 [数据库管理](../taos-sql/database)。 关于这些参数的详细说明请参考 [数据库管理](../../taos-sql/database)。
一个数据库所需要的内存大小等于 一个数据库所需要的内存大小等于
...@@ -24,7 +24,7 @@ title: 容量规划 ...@@ -24,7 +24,7 @@ title: 容量规划
vgroups * replica * (buffer + pages * pagesize + cachesize) vgroups * replica * (buffer + pages * pagesize + cachesize)
``` ```
但要注意的是这些内存并不需要由单一服务器提供,而是由整个集群中所有数据节点共同负担,相当于由这些数据节点所在的服务器共同负担。如果集群中有不止一个数据库,则所需内存要累加,并由集群中所有服务器共同负担。更复杂的场景是如果集群中的数据节点并非在最初就一次性全部建立,而是随着使用中系统负载的增加逐步增加服务器并增加数据节点,则新创建的数据库会导致新旧数据节点上的负载并不均衡,此时简单的理论计算并不能直接使用,要结合各数据节点的负载情况。 但要注意的是这些内存并不需要由单一服务器提供,而是由整个集群中所有数据节点共同负担,相当于由这些数据节点所在的服务器共同负担。如果集群中有不止一个数据库,则所需内存要累加。更复杂的场景是如果集群中的数据节点并非在最初就一次性全部建立,而是随着使用中系统负载的增加逐步增加服务器并增加数据节点,则新创建的数据库会导致新旧数据节点上的负载并不均衡,此时简单的理论计算并不能直接使用,要结合各数据节点的负载情况。
## 客户端内存需求 ## 客户端内存需求
...@@ -51,7 +51,7 @@ CPU 的需求取决于如下两方面: ...@@ -51,7 +51,7 @@ CPU 的需求取决于如下两方面:
- **数据插入** TDengine 单核每秒能至少处理一万个插入请求。每个插入请求可以带多条记录,一次插入一条记录与插入 10 条记录,消耗的计算资源差别很小。因此每次插入,条数越大,插入效率越高。如果一个插入请求带 200 条以上记录,单核就能达到每秒插入 100 万条记录的速度。但对前端数据采集的要求越高,因为需要缓存记录,然后一批插入。 - **数据插入** TDengine 单核每秒能至少处理一万个插入请求。每个插入请求可以带多条记录,一次插入一条记录与插入 10 条记录,消耗的计算资源差别很小。因此每次插入,条数越大,插入效率越高。如果一个插入请求带 200 条以上记录,单核就能达到每秒插入 100 万条记录的速度。但对前端数据采集的要求越高,因为需要缓存记录,然后一批插入。
- **查询需求** TDengine 提供高效的查询,但是每个场景的查询差异很大,查询频次变化也很大,难以给出客观数字。需要用户针对自己的场景,写一些查询语句,才能确定。 - **查询需求** TDengine 提供高效的查询,但是每个场景的查询差异很大,查询频次变化也很大,难以给出客观数字。需要用户针对自己的场景,写一些查询语句,才能确定。
因此仅对数据插入而言,CPU 是可以估算出来的,但查询所耗的计算资源无法估算。在实际运过程中,不建议 CPU 使用率超过 50%,超过后,需要增加新的节点,以获得更多计算资源。 因此仅对数据插入而言,CPU 是可以估算出来的,但查询所耗的计算资源无法估算。在实际运过程中,不建议 CPU 使用率超过 50%,超过后,需要增加新的节点,以获得更多计算资源。
## 存储需求 ## 存储需求
......
...@@ -27,4 +27,4 @@ TDengine 集群的节点数必须大于等于副本数,否则创建表时将 ...@@ -27,4 +27,4 @@ TDengine 集群的节点数必须大于等于副本数,否则创建表时将
当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。
另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../reference/taosX) 另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX)
...@@ -102,11 +102,6 @@ extern int32_t tsQuerySmaOptimize; ...@@ -102,11 +102,6 @@ extern int32_t tsQuerySmaOptimize;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime; extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay;
extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int64_t tsMaxRetentWindow;
// build info // build info
extern char version[]; extern char version[];
......
...@@ -118,20 +118,6 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000; ...@@ -118,20 +118,6 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 database precision unit for interval time range, changed accordingly // 1 database precision unit for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1; int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f;
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// the maximum allowed query buffer size during query processing for each data node. // the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default) // -1 no limit (default)
// 0 no query allowed, queries are disabled // 0 no query allowed, queries are disabled
...@@ -330,7 +316,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -330,7 +316,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1; if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1; if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1; if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1; if (cfgAddFloat(pCfg, "minimalTmpDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1; if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
...@@ -383,10 +369,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -383,10 +369,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1; if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "streamCompDelayRatio", tsStreamComputDelayRatio, 0.1, 0.9, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "retrieveBlockingModel", tsRetrieveBlockingModel, 0) != 0) return -1; if (cfgAddBool(pCfg, "retrieveBlockingModel", tsRetrieveBlockingModel, 0) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
...@@ -532,7 +514,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -532,7 +514,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX); tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX); taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval; tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
if (taosMulMkDir(tsTempDir) != 0) { if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr()); uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1; return -1;
...@@ -579,10 +561,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -579,10 +561,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32; tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32; tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32; tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32;
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval; tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
...@@ -758,10 +736,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -758,10 +736,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32; tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
} else if (strcasecmp("maxNumOfDistinctRes", name) == 0) { } else if (strcasecmp("maxNumOfDistinctRes", name) == 0) {
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
} else if (strcasecmp("maxStreamCompDelay", name) == 0) {
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
} else if (strcasecmp("maxFirstStreamCompDelay", name) == 0) {
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
} }
break; break;
} }
...@@ -772,8 +746,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -772,8 +746,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break; break;
} }
case 'i': { case 'i': {
if (strcasecmp("minimalTempDirGB", name) == 0) { if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval; tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
} else if (strcasecmp("minimalDataDirGB", name) == 0) { } else if (strcasecmp("minimalDataDirGB", name) == 0) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
} else if (strcasecmp("minSlidingTime", name) == 0) { } else if (strcasecmp("minSlidingTime", name) == 0) {
...@@ -883,9 +857,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -883,9 +857,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break; break;
} }
case 'r': { case 'r': {
if (strcasecmp("retryStreamCompDelay", name) == 0) { if (strcasecmp("retrieveBlockingModel", name) == 0) {
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
} else if (strcasecmp("retrieveBlockingModel", name) == 0) {
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval; tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
} else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) { } else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
...@@ -913,8 +885,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -913,8 +885,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32; tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
} else if (strcasecmp("statusInterval", name) == 0) { } else if (strcasecmp("statusInterval", name) == 0) {
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32; tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
} else if (strcasecmp("streamCompDelayRatio", name) == 0) {
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
} else if (strcasecmp("slaveQuery", name) == 0) { } else if (strcasecmp("slaveQuery", name) == 0) {
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
} else if (strcasecmp("snodeShmSize", name) == 0) { } else if (strcasecmp("snodeShmSize", name) == 0) {
......
...@@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { ...@@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
} }
static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) { SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
int32_t rightPos) {
SJoinOperatorInfo* pJoinInfo = pOperator->info; SJoinOperatorInfo* pJoinInfo = pOperator->info;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
...@@ -129,7 +130,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* ...@@ -129,7 +130,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock*
int32_t rowIndex = -1; int32_t rowIndex = -1;
SColumnInfoData* pSrc = NULL; SColumnInfoData* pSrc = NULL;
if (pJoinInfo->pLeft->info.blockId == blockId) { if (pLeftBlock->info.blockId == blockId) {
pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
rowIndex = leftPos; rowIndex = leftPos;
} else { } else {
...@@ -144,7 +145,128 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* ...@@ -144,7 +145,128 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock*
colDataAppend(pDst, currRow, p, false); colDataAppend(pDst, currRow, p, false);
} }
} }
}
typedef struct SRowLocation {
SSDataBlock* pDataBlock;
int32_t pos;
} SRowLocation;
// pBlock[tsSlotId][startPos, endPos) == timestamp,
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
int32_t numRows = pBlock->info.rows;
ASSERT(startPos < numRows);
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
int32_t i = startPos;
for (; i < numRows; ++i) {
char* pNextVal = colDataGetData(pCol, i);
if (timestamp != *(int64_t*)pNextVal) {
break;
}
}
int32_t endPos = i;
*pEndPos = endPos;
if (endPos - startPos == 0) {
return 0;
}
SSDataBlock* block = pBlock;
bool createdNewBlock = false;
if (endPos == numRows) {
block = blockDataExtractBlock(pBlock, startPos, endPos-startPos);
taosArrayPush(createdBlocks, &block);
createdNewBlock = true;
}
SRowLocation location = {0};
for (int32_t j = startPos; j < endPos; ++j) {
location.pDataBlock = block;
location.pos = ( createdNewBlock ? j - startPos : j);
taosArrayPush(rowLocations, &location);
}
return 0;
}
// whichChild == 0, left child of join; whichChild ==1, right child of join
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
SSDataBlock* startDataBlock, int32_t startPos,
int64_t timestamp, SArray* rowLocations,
SArray* createdBlocks) {
ASSERT(whichChild == 0 || whichChild == 1);
SJoinOperatorInfo* pJoinInfo = pOperator->info;
int32_t endPos = -1;
SSDataBlock* dataBlock = startDataBlock;
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
while (endPos == dataBlock->info.rows) {
SOperatorInfo* ds = pOperator->pDownstream[whichChild];
dataBlock = ds->fpSet.getNextFn(ds);
if (whichChild == 0) {
pJoinInfo->leftPos = 0;
pJoinInfo->pLeft = dataBlock;
} else if (whichChild == 1) {
pJoinInfo->rightPos = 0;
pJoinInfo->pRight = dataBlock;
}
if (dataBlock == NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
endPos = -1;
break;
}
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
}
if (endPos != -1) {
if (whichChild == 0) {
pJoinInfo->leftPos = endPos;
} else if (whichChild == 1) {
pJoinInfo->rightPos = endPos;
}
}
return 0;
}
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
int32_t* nRows) {
SJoinOperatorInfo* pJoinInfo = pOperator->info;
SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
for (int32_t i = 0; i < leftNumJoin; ++i) {
for (int32_t j = 0; j < rightNumJoin; ++j) {
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
}
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(rightCreatedBlocks);
taosArrayDestroy(rightRowLocations);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
return TSDB_CODE_SUCCESS;
} }
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
...@@ -195,18 +317,15 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) ...@@ -195,18 +317,15 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
while (1) { while (1) {
int64_t leftTs = 0; int64_t leftTs = 0;
int64_t rightTs = 0; int64_t rightTs = 0;
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) { if (!hasNextTs) {
break; break;
} }
if (leftTs == rightTs) { if (leftTs == rightTs) {
mergeJoinJoinLeftRight(pOperator, pRes, nrows, mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight,
pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); pJoinInfo->rightPos);
pJoinInfo->leftPos += 1; mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
pJoinInfo->rightPos += 1;
nrows += 1;
} else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) { } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
pJoinInfo->leftPos += 1; pJoinInfo->leftPos += 1;
......
...@@ -2098,9 +2098,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2098,9 +2098,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) { switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL: case TSDB_FILL_NULL: {
colDataAppendNULL(pDst, rows); colDataAppendNULL(pDst, rows);
pResBlock->info.rows += 1;
break; break;
}
case TSDB_FILL_SET_VALUE: { case TSDB_FILL_SET_VALUE: {
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;
...@@ -2118,9 +2120,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2118,9 +2120,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false); colDataAppend(pDst, rows, (char*)&v, false);
} }
} break; pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_LINEAR: case TSDB_FILL_LINEAR: {
#if 0 #if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
...@@ -2151,17 +2155,22 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2151,17 +2155,22 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
} }
} }
#endif #endif
// TODO: pResBlock->info.rows += 1;
break; break;
}
case TSDB_FILL_PREV: { case TSDB_FILL_PREV: {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, rows, pkey->pData, false); colDataAppend(pDst, rows, pkey->pData, false);
} break; pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_NEXT: { case TSDB_FILL_NEXT: {
char* p = colDataGetData(pSrc, rowIndex); char* p = colDataGetData(pSrc, rowIndex);
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex)); colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex));
} break; pResBlock->info.rows += 1;
break;
}
case TSDB_FILL_NONE: case TSDB_FILL_NONE:
default: default:
...@@ -2169,7 +2178,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2169,7 +2178,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
} }
} }
pResBlock->info.rows += 1;
} }
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
...@@ -2221,6 +2229,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2221,6 +2229,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SInterval* pInterval = &pSliceInfo->interval; SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
blockDataCleanup(pResBlock);
int32_t numOfRows = 0; int32_t numOfRows = 0;
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
......
...@@ -98,6 +98,11 @@ while $i < $tbNum ...@@ -98,6 +98,11 @@ while $i < $tbNum
endw endw
print ===============multivnode projection join.sim print ===============multivnode projection join.sim
sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts;
print ===> rows $row
if $row != 9000 then
print expect 9000, actual: $row
endi
sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1; sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1;
print ===> rows $row print ===> rows $row
if $row != 3000 then if $row != 3000 then
......
...@@ -377,11 +377,11 @@ class TDTestCase: ...@@ -377,11 +377,11 @@ class TDTestCase:
tdSql.query("select ct1.c_int from db.ct1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts") tdSql.query("select ct1.c_int from db.ct1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts")
tdSql.checkRows(self.rows) tdSql.checkRows(self.rows)
tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts") tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts")
tdSql.checkRows(self.rows) tdSql.checkRows(self.rows + int(self.rows * 0.6 //3)+ int(self.rows * 0.8 // 4))
tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts") tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts")
tdSql.checkRows(self.rows + 3) tdSql.checkRows(self.rows + 3)
tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts") tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts")
tdSql.checkRows(self.rows * 3 + 6) tdSql.checkRows(50)
tdSql.query("select count(*) from db.ct1") tdSql.query("select count(*) from db.ct1")
tdSql.checkData(0, 0, self.rows) tdSql.checkData(0, 0, self.rows)
......
...@@ -43,9 +43,10 @@ class TDTestCase: ...@@ -43,9 +43,10 @@ class TDTestCase:
tdLog.exit("compare error: %s != %s"%src, dst) tdLog.exit("compare error: %s != %s"%src, dst)
else: else:
break break
tdSql.execute('use db_taosx') tdSql.execute('use db_taosx')
tdSql.query("select * from ct3 order by c1 desc")
tdSql.query("select * from ct3 order by c1 desc")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, 51) tdSql.checkData(0, 1, 51)
tdSql.checkData(0, 4, 940) tdSql.checkData(0, 4, 940)
...@@ -63,12 +64,12 @@ class TDTestCase: ...@@ -63,12 +64,12 @@ class TDTestCase:
tdSql.checkData(0, 3, "a") tdSql.checkData(0, 3, "a")
tdSql.checkData(1, 4, None) tdSql.checkData(1, 4, None)
tdSql.query("select * from n1") tdSql.query("select * from n1 order by cc3 desc")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, "eeee") tdSql.checkData(0, 1, "eeee")
tdSql.checkData(1, 2, 940) tdSql.checkData(1, 2, 940)
tdSql.query("select * from jt order by i desc;") tdSql.query("select * from jt order by i desc")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, 11) tdSql.checkData(0, 1, 11)
tdSql.checkData(0, 2, None) tdSql.checkData(0, 2, None)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册