diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 33ac4538945cf5d5a636726ab16c2e2a26364752..f4d45558329f381e4fef2c6953bb6145d3cd3649 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -520,7 +520,7 @@ SELECT TIMEDIFF(ts | datetime_string1, ts | datetime_string2 [, time_unit]) FROM **应用字段**:表示 UNIX 时间戳的 BIGINT, TIMESTAMP 类型,或符合日期时间格式的 VARCHAR, NCHAR 类型。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 **嵌套子查询支持**:适用于内层查询和外层查询。 @@ -542,7 +542,7 @@ SELECT TIMETRUNCATE(ts | datetime_string , time_unit) FROM { tb_name | stb_name **应用字段**:表示 UNIX 时间戳的 BIGINT, TIMESTAMP 类型,或符合日期时间格式的 VARCHAR, NCHAR 类型。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 **使用说明**: - 支持的时间单位 time_unit 如下: @@ -562,7 +562,7 @@ SELECT TIMEZONE() FROM { tb_name | stb_name } [WHERE clause]; **应用字段**:无 -**适用于**:表、超级表。 +**适用于**:表和超级表。 #### TODAY @@ -579,7 +579,7 @@ INSERT INTO tb_name VALUES (TODAY(), ...); **应用字段**:在 WHERE 或 INSERT 语句中使用时只能作用于 TIMESTAMP 类型的字段。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 **使用说明**: @@ -600,13 +600,13 @@ TDengine 支持针对数据的聚合查询。提供如下聚合函数。 SELECT AVG(field_name) FROM tb_name [WHERE clause]; ``` -**功能说明**:统计表/超级表中某列的平均值。 +**功能说明**:统计指定字段的平均值。 -**返回数据类型**:双精度浮点数 Double。 +**返回数据类型**:DOUBLE。 **适用数据类型**:数值类型。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 ### COUNT @@ -615,19 +615,18 @@ SELECT AVG(field_name) FROM tb_name [WHERE clause]; SELECT COUNT([*|field_name]) FROM tb_name [WHERE clause]; ``` -**功能说明**:统计表/超级表中记录行数或某列的非空值个数。 +**功能说明**:统计指定字段的记录行数。 -**返回数据类型**:长整型 INT64。 +**返回数据类型**:BIGINT。 -**适用数据类型**:应用全部字段。 +**适用数据类型**:全部类型字段。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 **使用说明**: - 可以使用星号(\*)来替代具体的字段,使用星号(\*)返回全部记录数量。 -- 针对同一表的(不包含 NULL 值)字段查询结果均相同。 -- 如果统计对象是具体的列,则返回该列中非 NULL 值的记录数量。 +- 如果统计字段是具体的列,则返回该列中非 NULL 值的记录数量。 ### ELAPSED @@ -638,17 +637,18 @@ SELECT ELAPSED(ts_primary_key [, time_unit]) FROM { tb_name | stb_name } [WHERE **功能说明**:elapsed函数表达了统计周期内连续的时间长度,和twa函数配合使用可以计算统计曲线下的面积。在通过INTERVAL子句指定窗口的情况下,统计在给定时间范围内的每个窗口内有数据覆盖的时间范围;如果没有INTERVAL子句,则返回整个给定时间范围内的有数据覆盖的时间范围。注意,ELAPSED返回的并不是时间范围的绝对值,而是绝对值除以time_unit所得到的单位个数。 -**返回结果类型**:Double +**返回结果类型**:DOUBLE。 -**适用数据类型**:Timestamp类型 +**适用数据类型**:TIMESTAMP。 **支持的版本**:2.6.0.0 及以后的版本。 **适用于**: 表,超级表,嵌套查询的外层查询 **说明**: -- field_name参数只能是表的第一列,即timestamp主键列。 -- 按time_unit参数指定的时间单位返回,最小是数据库的时间分辨率。time_unit参数未指定时,以数据库的时间分辨率为时间单位。 +- field_name参数只能是表的第一列,即 TIMESTAMP 类型的主键列。 +- 按time_unit参数指定的时间单位返回,最小是数据库的时间分辨率。time_unit 参数未指定时,以数据库的时间分辨率为时间单位。支持的时间单位 time_unit 如下: + 1b(纳秒), 1u(微秒),1a(毫秒),1s(秒),1m(分),1h(小时),1d(天), 1w(周)。 - 可以和interval组合使用,返回每个时间窗口的时间戳差值。需要特别注意的是,除第一个时间窗口和最后一个时间窗口外,中间窗口的时间戳差值均为窗口长度。 - order by asc/desc不影响差值的计算结果。 - 对于超级表,需要和group by tbname子句组合使用,不可以直接使用。 @@ -677,11 +677,11 @@ SELECT LEASTSQUARES(field_name, start_val, step_val) FROM tb_name [WHERE clause] SELECT MODE(field_name) FROM tb_name [WHERE clause]; ``` -**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,输出空。 +**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,输出NULL。 -**返回数据类型**:同应用的字段。 +**返回数据类型**:与输入数据类型一致。 -**适用数据类型**: 数值类型。 +**适用数据类型**:全部类型字段。 **适用于**:表和超级表。 @@ -692,11 +692,11 @@ SELECT MODE(field_name) FROM tb_name [WHERE clause]; SELECT SPREAD(field_name) FROM { tb_name | stb_name } [WHERE clause]; ``` -**功能说明**:统计表/超级表中某列的最大值和最小值之差。 +**功能说明**:统计表中某列的最大值和最小值之差。 -**返回数据类型**:双精度浮点数。 +**返回数据类型**:DOUBLE。 -**适用数据类型**:数值类型或TIMESTAMP类型。 +**适用数据类型**:INTEGER, TIMESTAMP。 **适用于**:表和超级表。 @@ -709,7 +709,7 @@ SELECT STDDEV(field_name) FROM tb_name [WHERE clause]; **功能说明**:统计表中某列的均方差。 -**返回数据类型**:双精度浮点数 Double。 +**返回数据类型**:DOUBLE。 **适用数据类型**:数值类型。 @@ -724,7 +724,7 @@ SELECT SUM(field_name) FROM tb_name [WHERE clause]; **功能说明**:统计表/超级表中某列的和。 -**返回数据类型**:双精度浮点数 Double 和长整型 INT64。 +**返回数据类型**:DOUBLE, BIGINT。 **适用数据类型**:数值类型。 @@ -738,10 +738,10 @@ SELECT HYPERLOGLOG(field_name) FROM { tb_name | stb_name } [WHERE clause]; ``` **功能说明**: - - 采用 hyperloglog 算法,返回某列的基数。该算法在数据量很大的情况下,可以明显降低内存的占用,但是求出来的基数是个估算值,标准误差(标准误差是多次实验,每次的平均数的标准差,不是与真实结果的误差)为 0.81%。 + - 采用 hyperloglog 算法,返回某列的基数。该算法在数据量很大的情况下,可以明显降低内存的占用,求出来的基数是个估算值,标准误差(标准误差是多次实验,每次的平均数的标准差,不是与真实结果的误差)为 0.81%。 - 在数据量较少的时候该算法不是很准确,可以使用 select count(data) from (select unique(col) as data from table) 的方法。 -**返回结果类型**:整形。 +**返回结果类型**:INTEGER。 **适用数据类型**:任何类型。 @@ -756,7 +756,7 @@ SELECT HISTOGRAM(field_name,bin_type, bin_description, normalized) FROM tb_nam **功能说明**:统计数据按照用户指定区间的分布。 -**返回结果类型**:如归一化参数 normalized 设置为 1,返回结果为双精度浮点类型 DOUBLE,否则为长整形 INT64。 +**返回结果类型**:如归一化参数 normalized 设置为 1,返回结果为 DOUBLE 类型,否则为 BIGINT 类型。 **适用数据类型**:数值型字段。 @@ -791,11 +791,15 @@ FROM { tb_name | stb_name } [WHERE clause] **功能说明**:统计表/超级表中指定列的值的近似百分比分位数,与 PERCENTILE 函数相似,但是返回近似结果。 -**返回数据类型**: 双精度浮点数 Double。 +**返回数据类型**: DOUBLE。 -**适用数据类型**:数值类型。P值范围是[0,100],当为0时等同于MIN,为100时等同于MAX。如果不指定 algo_type 则使用默认算法 。 +**适用数据类型**:数值类型。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 + +**说明**: +- P值范围是[0,100],当为0时等同于MIN,为100时等同于MAX。 +- algo_type 取值为 "default" 或 "t-digest"。 输入为 "default" 时函数使用基于直方图算法进行计算。输入为 "t-digest" 时使用t-digest算法计算分位数的近似结果。如果不指定 algo_type 则使用 "default" 算法。 ### BOTTOM @@ -939,7 +943,7 @@ SELECT PERCENTILE(field_name, P) FROM { tb_name } [WHERE clause]; **功能说明**:统计表中某列的值百分比分位数。 -**返回数据类型**: 双精度浮点数 Double。 +**返回数据类型**: DOUBLE。 **应用字段**:数值类型。 @@ -960,7 +964,7 @@ SELECT TAIL(field_name, k, offset_val) FROM {tb_name | stb_name} [WHERE clause]; **返回数据类型**:同应用的字段。 -**适用数据类型**:适合于除时间主列外的任何类型。 +**适用数据类型**:适合于除时间主键列外的任何类型。 **适用于**:表、超级表。 @@ -977,7 +981,7 @@ SELECT TOP(field_name, K) FROM { tb_name | stb_name } [WHERE clause]; **适用数据类型**:数值类型。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 **使用说明**: @@ -1018,13 +1022,13 @@ SELECT CSUM(field_name) FROM { tb_name | stb_name } [WHERE clause] **嵌套子查询支持**: 适用于内层查询和外层查询。 -**适用于**:表和超级表 +**适用于**:表和超级表。 **使用说明**: - 不支持 +、-、*、/ 运算,如 csum(col1) + csum(col2)。 - 只能与聚合(Aggregation)函数一起使用。 该函数可以应用在普通表和超级表上。 - - 使用在超级表上的时候,需要搭配 Group by tbname使用,将结果强制规约到单个时间线。 + - 使用在超级表上的时候,需要搭配 PARTITION BY tbname使用,将结果强制规约到单个时间线。 ### DERIVATIVE @@ -1035,13 +1039,13 @@ SELECT DERIVATIVE(field_name, time_interval, ignore_negative) FROM tb_name [WHER **功能说明**:统计表中某列数值的单位变化率。其中单位时间区间的长度可以通过 time_interval 参数指定,最小可以是 1 秒(1s);ignore_negative 参数的值可以是 0 或 1,为 1 时表示忽略负值。 -**返回数据类型**:双精度浮点数。 +**返回数据类型**:DOUBLE。 **适用数据类型**:数值类型。 -**适用于**:表、超级表 +**适用于**:表和超级表。 -**使用说明**: DERIVATIVE 函数可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。 +**使用说明**: DERIVATIVE 函数可以在由 PARTITION BY 划分出单独时间线的情况下用于超级表(也即 PARTITION BY tbname)。 ### DIFF @@ -1056,7 +1060,7 @@ SELECT {DIFF(field_name, ignore_negative) | DIFF(field_name)} FROM tb_name [WHER **适用数据类型**:数值类型。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 **使用说明**: 输出结果行数是范围内总行数减一,第一行没有结果输出。 @@ -1069,11 +1073,12 @@ SELECT IRATE(field_name) FROM tb_name WHERE clause; **功能说明**:计算瞬时增长率。使用时间区间中最后两个样本数据来计算瞬时增长速率;如果这两个值呈递减关系,那么只取最后一个数用于计算,而不是使用二者差值。 -**返回数据类型**:双精度浮点数 Double。 +**返回数据类型**:DOUBLE。 **适用数据类型**:数值类型。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 + ### MAVG @@ -1083,19 +1088,19 @@ SELECT MAVG(field_name, K) FROM { tb_name | stb_name } [WHERE clause] **功能说明**: 计算连续 k 个值的移动平均数(moving average)。如果输入行数小于 k,则无结果输出。参数 k 的合法输入范围是 1≤ k ≤ 1000。 - **返回结果类型**: 返回双精度浮点数类型。 + **返回结果类型**: DOUBLE。 **适用数据类型**: 数值类型。 **嵌套子查询支持**: 适用于内层查询和外层查询。 - **适用于**:表和超级表 + **适用于**:表和超级表。 **使用说明**: - 不支持 +、-、*、/ 运算,如 mavg(col1, k1) + mavg(col2, k1); - 只能与普通列,选择(Selection)、投影(Projection)函数一起使用,不能与聚合(Aggregation)函数一起使用; - - 使用在超级表上的时候,需要搭配 Group by tbname使用,将结果强制规约到单个时间线。 + - 使用在超级表上的时候,需要搭配 PARTITION BY tbname使用,将结果强制规约到单个时间线。 ### SAMPLE @@ -1111,12 +1116,12 @@ SELECT SAMPLE(field_name, K) FROM { tb_name | stb_name } [WHERE clause] **嵌套子查询支持**: 适用于内层查询和外层查询。 - **适用于**:表和超级表 + **适用于**:表和超级表。 **使用说明**: - 不能参与表达式计算;该函数可以应用在普通表和超级表上; - - 使用在超级表上的时候,需要搭配 Group by tbname 使用,将结果强制规约到单个时间线。 + - 使用在超级表上的时候,需要搭配 PARTITION by tbname 使用,将结果强制规约到单个时间线。 ### STATECOUNT @@ -1128,10 +1133,10 @@ SELECT STATECOUNT(field_name, oper, val) FROM { tb_name | stb_name } [WHERE clau **参数范围**: -- oper : LT (小于)、GT(大于)、LE(小于等于)、GE(大于等于)、NE(不等于)、EQ(等于),不区分大小写。 +- oper : "LT" (小于)、"GT"(大于)、"LE"(小于等于)、"GE"(大于等于)、"NE"(不等于)、"EQ"(等于),不区分大小写。 - val : 数值型 -**返回结果类型**:整形。 +**返回结果类型**:INTEGER。 **适用数据类型**:数值类型。 @@ -1141,7 +1146,7 @@ SELECT STATECOUNT(field_name, oper, val) FROM { tb_name | stb_name } [WHERE clau **使用说明**: -- 该函数可以应用在普通表上,在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname) +- 该函数可以应用在普通表上,在由 PARTITION BY 划分出单独时间线的情况下用于超级表(也即 PARTITION BY tbname) - 不能和窗口操作一起使用,例如 interval/state_window/session_window。 @@ -1155,11 +1160,11 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W **参数范围**: -- oper : LT (小于)、GT(大于)、LE(小于等于)、GE(大于等于)、NE(不等于)、EQ(等于),不区分大小写。 +- oper : "LT" (小于)、"GT"(大于)、"LE"(小于等于)、"GE"(大于等于)、"NE"(不等于)、"EQ"(等于),不区分大小写。 - val : 数值型 - unit : 时间长度的单位,范围[1s、1m、1h ],不足一个单位舍去。默认为 1s。 -**返回结果类型**:整形。 +**返回结果类型**:INTEGER。 **适用数据类型**:数值类型。 @@ -1169,7 +1174,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W **使用说明**: -- 该函数可以应用在普通表上,在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname) +- 该函数可以应用在普通表上,在由 PARTITION BY 划分出单独时间线的情况下用于超级表(也即 PARTITION BY tbname) - 不能和窗口操作一起使用,例如 interval/state_window/session_window。 @@ -1181,13 +1186,13 @@ SELECT TWA(field_name) FROM tb_name WHERE clause; **功能说明**:时间加权平均函数。统计表中某列在一段时间内的时间加权平均。 -**返回数据类型**:双精度浮点数 Double。 +**返回数据类型**:DOUBLE。 **适用数据类型**:数值类型。 -**适用于**:表、超级表。 +**适用于**:表和超级表。 -**使用说明**: TWA 函数可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。 +**使用说明**: TWA 函数可以在由 PARTITION BY 划分出单独时间线的情况下用于超级表(也即 PARTITION BY tbname)。 ## 系统信息函数 diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 46de219035a36ae6c40c4e8e1ac8bb4cc777d6f7..4e6a450d35d01d7c41a80bbb7ae1fc5d1c21b127 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -46,11 +46,6 @@ void tsdbCloseCache(SLRUCache *pCache) { } } -/* static void getTableCacheKeyS(tb_uid_t uid, const char *cacheType, char *key, int *len) { */ -/* snprintf(key, 30, "%" PRIi64 "%s", uid, cacheType); */ -/* *len = strlen(key); */ -/* } */ - static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { if (cacheType == 0) { // last_row *(uint64_t *)key = (uint64_t)uid; @@ -649,44 +644,44 @@ _err: return code; } -static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) { - int32_t code = 0; - - SColVal *pColVal = &(SColVal){0}; - - if (pRow->type == 0) { - *ppRow = tdRowDup(pRow->pTSRow); - } else { - SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); - if (pArray == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - TSDBKEY key = TSDBROW_KEY(pRow); - STColumn *pTColumn = &pTSchema->columns[0]; - *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); - - if (taosArrayPush(pArray, pColVal) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { - tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - if (taosArrayPush(pArray, pColVal) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } - - code = tdSTSRowNew(pArray, pTSchema, ppRow); - if (code) goto _exit; - } - -_exit: - return code; -} +/* static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) { */ +/* int32_t code = 0; */ + +/* SColVal *pColVal = &(SColVal){0}; */ + +/* if (pRow->type == 0) { */ +/* *ppRow = tdRowDup(pRow->pTSRow); */ +/* } else { */ +/* SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); */ +/* if (pArray == NULL) { */ +/* code = TSDB_CODE_OUT_OF_MEMORY; */ +/* goto _exit; */ +/* } */ + +/* TSDBKEY key = TSDBROW_KEY(pRow); */ +/* STColumn *pTColumn = &pTSchema->columns[0]; */ +/* *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); */ + +/* if (taosArrayPush(pArray, pColVal) == NULL) { */ +/* code = TSDB_CODE_OUT_OF_MEMORY; */ +/* goto _exit; */ +/* } */ + +/* for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { */ +/* tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); */ +/* if (taosArrayPush(pArray, pColVal) == NULL) { */ +/* code = TSDB_CODE_OUT_OF_MEMORY; */ +/* goto _exit; */ +/* } */ +/* } */ + +/* code = tdSTSRowNew(pArray, pTSchema, ppRow); */ +/* if (code) goto _exit; */ +/* } */ + +/* _exit: */ +/* return code; */ +/* } */ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { bool deleted = false; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 7b3c590f07469009511987abf8f5075973657961..8902804fab478e906484be5d54d0cd636d18b814 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { } static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, - SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) { + SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, + int32_t rightPos) { SJoinOperatorInfo* pJoinInfo = pOperator->info; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { @@ -129,7 +130,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* int32_t rowIndex = -1; SColumnInfoData* pSrc = NULL; - if (pJoinInfo->pLeft->info.blockId == blockId) { + if (pLeftBlock->info.blockId == blockId) { pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); rowIndex = leftPos; } else { @@ -144,7 +145,128 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* colDataAppend(pDst, currRow, p, false); } } +} +typedef struct SRowLocation { + SSDataBlock* pDataBlock; + int32_t pos; +} SRowLocation; + +// pBlock[tsSlotId][startPos, endPos) == timestamp, +static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, + int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { + int32_t numRows = pBlock->info.rows; + ASSERT(startPos < numRows); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); + + int32_t i = startPos; + for (; i < numRows; ++i) { + char* pNextVal = colDataGetData(pCol, i); + if (timestamp != *(int64_t*)pNextVal) { + break; + } + } + int32_t endPos = i; + *pEndPos = endPos; + + if (endPos - startPos == 0) { + return 0; + } + + SSDataBlock* block = pBlock; + bool createdNewBlock = false; + if (endPos == numRows) { + block = blockDataExtractBlock(pBlock, startPos, endPos-startPos); + taosArrayPush(createdBlocks, &block); + createdNewBlock = true; + } + SRowLocation location = {0}; + for (int32_t j = startPos; j < endPos; ++j) { + location.pDataBlock = block; + location.pos = ( createdNewBlock ? j - startPos : j); + taosArrayPush(rowLocations, &location); + } + return 0; +} + +// whichChild == 0, left child of join; whichChild ==1, right child of join +static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId, + SSDataBlock* startDataBlock, int32_t startPos, + int64_t timestamp, SArray* rowLocations, + SArray* createdBlocks) { + ASSERT(whichChild == 0 || whichChild == 1); + + SJoinOperatorInfo* pJoinInfo = pOperator->info; + int32_t endPos = -1; + SSDataBlock* dataBlock = startDataBlock; + mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); + while (endPos == dataBlock->info.rows) { + SOperatorInfo* ds = pOperator->pDownstream[whichChild]; + dataBlock = ds->fpSet.getNextFn(ds); + if (whichChild == 0) { + pJoinInfo->leftPos = 0; + pJoinInfo->pLeft = dataBlock; + } else if (whichChild == 1) { + pJoinInfo->rightPos = 0; + pJoinInfo->pRight = dataBlock; + } + + if (dataBlock == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + endPos = -1; + break; + } + + mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks); + } + if (endPos != -1) { + if (whichChild == 0) { + pJoinInfo->leftPos = endPos; + } else if (whichChild == 1) { + pJoinInfo->rightPos = endPos; + } + } + return 0; +} + +static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, + int32_t* nRows) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; + SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + + SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, + pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, + pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); + size_t rightNumJoin = taosArrayGetSize(rightRowLocations); + for (int32_t i = 0; i < leftNumJoin; ++i) { + for (int32_t j = 0; j < rightNumJoin; ++j) { + SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); + SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); + mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, + rightRow->pos); + ++*nRows; + } + } + for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(rightCreatedBlocks); + taosArrayDestroy(rightRowLocations); + for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(leftCreatedBlocks); + taosArrayDestroy(leftRowLocations); + return TSDB_CODE_SUCCESS; } static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { @@ -195,18 +317,15 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) while (1) { int64_t leftTs = 0; int64_t rightTs = 0; - bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); + bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); if (!hasNextTs) { break; } if (leftTs == rightTs) { - mergeJoinJoinLeftRight(pOperator, pRes, nrows, - pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); - pJoinInfo->leftPos += 1; - pJoinInfo->rightPos += 1; - - nrows += 1; + mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, + pJoinInfo->rightPos); + mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) { pJoinInfo->leftPos += 1; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9a82b194a983b7d3ea188046924298cf1563738e..ed1580ed911e107dc4a8c8dcdb6179c8b1d466e5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2098,9 +2098,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); switch (pSliceInfo->fillType) { - case TSDB_FILL_NULL: + case TSDB_FILL_NULL: { colDataAppendNULL(pDst, rows); + pResBlock->info.rows += 1; break; + } case TSDB_FILL_SET_VALUE: { SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; @@ -2118,9 +2120,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); colDataAppend(pDst, rows, (char*)&v, false); } - } break; + pResBlock->info.rows += 1; + break; + } - case TSDB_FILL_LINEAR: + case TSDB_FILL_LINEAR: { #if 0 if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { @@ -2151,17 +2155,22 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp } } #endif + // TODO: pResBlock->info.rows += 1; break; - + } case TSDB_FILL_PREV: { SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); colDataAppend(pDst, rows, pkey->pData, false); - } break; + pResBlock->info.rows += 1; + break; + } case TSDB_FILL_NEXT: { char* p = colDataGetData(pSrc, rowIndex); colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex)); - } break; + pResBlock->info.rows += 1; + break; + } case TSDB_FILL_NONE: default: @@ -2169,7 +2178,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp } } - pResBlock->info.rows += 1; } static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { @@ -2221,6 +2229,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { SInterval* pInterval = &pSliceInfo->interval; SOperatorInfo* downstream = pOperator->pDownstream[0]; + blockDataCleanup(pResBlock); + int32_t numOfRows = 0; while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); diff --git a/tests/script/tsim/parser/join_multivnode.sim b/tests/script/tsim/parser/join_multivnode.sim index c33fa85fa255c732e7b358e2d9014d520a6beaac..f1204326d3c9de769b1fa68b4ce6c725478a18bf 100644 --- a/tests/script/tsim/parser/join_multivnode.sim +++ b/tests/script/tsim/parser/join_multivnode.sim @@ -98,6 +98,11 @@ while $i < $tbNum endw print ===============multivnode projection join.sim +sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts; +print ===> rows $row +if $row != 9000 then + print expect 9000, actual: $row +endi sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1; print ===> rows $row if $row != 3000 then diff --git a/tests/system-test/2-query/join.py b/tests/system-test/2-query/join.py index 2348873a34283572116e6eb97760733d400c6914..9d30e1946a16c487f22e43fe03461d559dc7c945 100644 --- a/tests/system-test/2-query/join.py +++ b/tests/system-test/2-query/join.py @@ -377,11 +377,11 @@ class TDTestCase: tdSql.query("select ct1.c_int from db.ct1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts") tdSql.checkRows(self.rows) tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts") - tdSql.checkRows(self.rows) + tdSql.checkRows(self.rows + int(self.rows * 0.6 //3)+ int(self.rows * 0.8 // 4)) tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts") tdSql.checkRows(self.rows + 3) tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts") - tdSql.checkRows(self.rows * 3 + 6) + tdSql.checkRows(50) tdSql.query("select count(*) from db.ct1") tdSql.checkData(0, 0, self.rows) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index a4b662efcbd302cce4f78522d67bc3e90df058c0..7c69f13228c4b2d029bc50890ce97d5fef3a3524 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -43,9 +43,13 @@ class TDTestCase: tdLog.exit("compare error: %s != %s"%src, dst) else: break - + tdSql.execute('use db_taosx') +<<<<<<< HEAD tdSql.query("select * from ct3 order by c1 desc") +======= + tdSql.query("select * from ct3 order by c1 desc") +>>>>>>> 425801f210e696ba6e8bd82f1b3a462d394a6fed tdSql.checkRows(2) tdSql.checkData(0, 1, 51) tdSql.checkData(0, 4, 940) @@ -58,17 +62,25 @@ class TDTestCase: tdSql.query("select * from ct2") tdSql.checkRows(0) +<<<<<<< HEAD tdSql.query("select * from ct0 order by c1 ") +======= + tdSql.query("select * from ct0 order by c1") +>>>>>>> 425801f210e696ba6e8bd82f1b3a462d394a6fed tdSql.checkRows(2) tdSql.checkData(0, 3, "a") tdSql.checkData(1, 4, None) - tdSql.query("select * from n1") + tdSql.query("select * from n1 order by cc3 desc") tdSql.checkRows(2) tdSql.checkData(0, 1, "eeee") tdSql.checkData(1, 2, 940) +<<<<<<< HEAD tdSql.query("select * from jt order by i desc;") +======= + tdSql.query("select * from jt order by i desc") +>>>>>>> 425801f210e696ba6e8bd82f1b3a462d394a6fed tdSql.checkRows(2) tdSql.checkData(0, 1, 11) tdSql.checkData(0, 2, None)