未验证 提交 347c9cee 编写于 作者: A Alex Duan 提交者: GitHub

Merge pull request #8242 from taosdata/fix/TD-10576

Fix/td 10576 Join query not support fill
...@@ -762,10 +762,11 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { ...@@ -762,10 +762,11 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
if (!dataBuf->ordered) { if (!dataBuf->ordered) {
char *pBlockData = pBlocks->data; char *pBlockData = pBlocks->data;
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
dataBuf->ordered = true;
if(tsClientMerge) {
int32_t i = 0; int32_t i = 0;
int32_t j = 1; int32_t j = 1;
while (j < pBlocks->numOfRows) { while (j < pBlocks->numOfRows) {
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i); TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j); TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
...@@ -774,7 +775,6 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { ...@@ -774,7 +775,6 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) { if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) {
memmove(pBlockData + dataBuf->rowSize * i, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize); memmove(pBlockData + dataBuf->rowSize * i, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
} }
++j; ++j;
continue; continue;
} }
...@@ -783,15 +783,12 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { ...@@ -783,15 +783,12 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
if (nextPos != j) { if (nextPos != j) {
memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize); memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
} }
++j; ++j;
} }
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1; pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows; dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
} }
}
dataBuf->prevTS = INT64_MIN; dataBuf->prevTS = INT64_MIN;
} }
...@@ -836,7 +833,9 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk ...@@ -836,7 +833,9 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
if (!dataBuf->ordered) { if (!dataBuf->ordered) {
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar); qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);
dataBuf->ordered = true;
if(tsClientMerge) {
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
int32_t i = 0; int32_t i = 0;
int32_t j = 1; int32_t j = 1;
...@@ -859,10 +858,9 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk ...@@ -859,10 +858,9 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
} }
++j; ++j;
} }
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1; pBlocks->numOfRows = i + 1;
} }
}
dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize; dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
dataBuf->prevTS = INT64_MIN; dataBuf->prevTS = INT64_MIN;
......
...@@ -5423,10 +5423,14 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo ...@@ -5423,10 +5423,14 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo
const char* msg4 = "illegal value or data overflow"; const char* msg4 = "illegal value or data overflow";
const char* msg5 = "fill only available for interval query"; const char* msg5 = "fill only available for interval query";
const char* msg6 = "not supported function now"; const char* msg6 = "not supported function now";
const char* msg7 = "join query not supported fill operation";
if ((!isTimeWindowQuery(pQueryInfo)) && (!tscIsPointInterpQuery(pQueryInfo))) { if ((!isTimeWindowQuery(pQueryInfo)) && (!tscIsPointInterpQuery(pQueryInfo))) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
} }
if(QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
/* /*
* fill options are set at the end position, when all columns are set properly * fill options are set at the end position, when all columns are set properly
......
...@@ -209,6 +209,8 @@ extern int32_t wDebugFlag; ...@@ -209,6 +209,8 @@ extern int32_t wDebugFlag;
extern int32_t cqDebugFlag; extern int32_t cqDebugFlag;
extern int32_t debugFlag; extern int32_t debugFlag;
extern int8_t tsClientMerge;
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy // lossy
extern char lossyColumns[]; extern char lossyColumns[];
...@@ -219,6 +221,7 @@ extern uint32_t curRange; ...@@ -219,6 +221,7 @@ extern uint32_t curRange;
extern char Compressor[]; extern char Compressor[];
#endif #endif
typedef struct { typedef struct {
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
int level; int level;
......
...@@ -254,6 +254,8 @@ int32_t tsdbDebugFlag = 131; ...@@ -254,6 +254,8 @@ int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131; int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int8_t tsClientMerge = 0;
#ifdef TD_TSZ #ifdef TD_TSZ
// //
// lossy compress 6 // lossy compress 6
...@@ -1581,6 +1583,16 @@ static void doInitGlobalConfig(void) { ...@@ -1581,6 +1583,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "clientMerge";
cfg.ptr = &tsClientMerge;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -1296,9 +1296,6 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1296,9 +1296,6 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
if (pCtx[k].currentStage == MERGE_STAGE) { if (pCtx[k].currentStage == MERGE_STAGE) {
pCtx[k].order = TSDB_ORDER_ASC; pCtx[k].order = TSDB_ORDER_ASC;
} }
pCtx[k].startTs = pQueryAttr->window.skey;
if (pCtx[k].functionId < 0) { if (pCtx[k].functionId < 0) {
// load the script and exec // load the script and exec
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 119 #define TSDB_CFG_MAX_NUM 120
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
...@@ -144,7 +144,6 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it ...@@ -144,7 +144,6 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
// backward to put the first data // backward to put the first data
hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
tSkipListPutImpl(pSkipList, pData, backward, false, hasDup); tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
for (int level = 0; level < pSkipList->maxLevel; level++) { for (int level = 0; level < pSkipList->maxLevel; level++) {
...@@ -163,7 +162,12 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it ...@@ -163,7 +162,12 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
for (int i = 0; i < pSkipList->maxLevel; i++) { for (int i = 0; i < pSkipList->maxLevel; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i); forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
} }
} else if(compare == 0) {
// same need special deal
forward[0] = SL_NODE_GET_BACKWARD_POINTER(SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail,0),0);
hasDup = true;
} else { } else {
SSkipListNode *p = NULL;
SSkipListNode *px = pSkipList->pHead; SSkipListNode *px = pSkipList->pHead;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) { for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
if (i < pSkipList->level) { if (i < pSkipList->level) {
...@@ -175,19 +179,29 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it ...@@ -175,19 +179,29 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
} }
} }
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i); // if px not head , must compare with px
if(px == pSkipList->pHead) {
p = SL_NODE_GET_FORWARD_POINTER(px, i);
} else {
p = px;
}
while (p != pSkipList->pTail) { while (p != pSkipList->pTail) {
pKey = SL_GET_NODE_KEY(pSkipList, p); pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey); compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) { if (compare >= 0) {
if (compare == 0 && !hasDup) hasDup = true; if (compare == 0) {
hasDup = true;
forward[0] = SL_NODE_GET_BACKWARD_POINTER(p, 0);
}
break; break;
} else { } else {
px = p; px = p;
p = SL_NODE_GET_FORWARD_POINTER(px, i); p = SL_NODE_GET_FORWARD_POINTER(px, i);
} }
} }
// if found duplicate, immediately break, needn't continue to loop set rest forward[i] value
if(hasDup) break;
} }
forward[i] = px; forward[i] = px;
......
...@@ -1714,7 +1714,6 @@ class TDTestCase: ...@@ -1714,7 +1714,6 @@ class TDTestCase:
sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(q_u_where) sql += "%s " % random.choice(q_u_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1731,7 +1730,6 @@ class TDTestCase: ...@@ -1731,7 +1730,6 @@ class TDTestCase:
sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(q_u_or_where) sql += "%s " % random.choice(q_u_or_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1767,7 +1765,6 @@ class TDTestCase: ...@@ -1767,7 +1765,6 @@ class TDTestCase:
sql += " from table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(q_u_where) sql += "%s " % random.choice(q_u_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1784,7 +1781,6 @@ class TDTestCase: ...@@ -1784,7 +1781,6 @@ class TDTestCase:
sql += " from table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(q_u_or_where) sql += "%s " % random.choice(q_u_or_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1818,7 +1814,6 @@ class TDTestCase: ...@@ -1818,7 +1814,6 @@ class TDTestCase:
sql += " from stable_1 t1, stable_2 t2 where t1.ts = t2.ts and " sql += " from stable_1 t1, stable_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(t_join_where) sql += "%s " % random.choice(t_join_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1835,7 +1830,6 @@ class TDTestCase: ...@@ -1835,7 +1830,6 @@ class TDTestCase:
sql += " from stable_1 t1, stable_2 t2 where t1.ts = t2.ts and " sql += " from stable_1 t1, stable_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(qt_u_or_where) sql += "%s " % random.choice(qt_u_or_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -2015,7 +2009,6 @@ class TDTestCase: ...@@ -2015,7 +2009,6 @@ class TDTestCase:
sql += " from stable_1 t1 , stable_2 t2 where t1.ts = t2.ts and " sql += " from stable_1 t1 , stable_2 t2 where t1.ts = t2.ts and "
sql += "%s and " % random.choice(t_join_where) sql += "%s and " % random.choice(t_join_where)
sql += "%s " % random.choice(interp_where_j) sql += "%s " % random.choice(interp_where_j)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -2032,7 +2025,6 @@ class TDTestCase: ...@@ -2032,7 +2025,6 @@ class TDTestCase:
sql += " from stable_1 t1 , stable_2 t2 where t1.ts = t2.ts and " sql += " from stable_1 t1 , stable_2 t2 where t1.ts = t2.ts and "
sql += "%s and " % random.choice(qt_u_or_where) sql += "%s and " % random.choice(qt_u_or_where)
sql += "%s " % random.choice(interp_where_j) sql += "%s " % random.choice(interp_where_j)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -2065,7 +2057,6 @@ class TDTestCase: ...@@ -2065,7 +2057,6 @@ class TDTestCase:
sql += " from table_0 t1, table_1 t2 where t1.ts = t2.ts and " sql += " from table_0 t1, table_1 t2 where t1.ts = t2.ts and "
#sql += "%s and " % random.choice(t_join_where) #sql += "%s and " % random.choice(t_join_where)
sql += "%s " % interp_where_j[random.randint(0,5)] sql += "%s " % interp_where_j[random.randint(0,5)]
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -2116,7 +2107,6 @@ class TDTestCase: ...@@ -2116,7 +2107,6 @@ class TDTestCase:
sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
#sql += "%s " % random.choice(interp_where_j) #sql += "%s " % random.choice(interp_where_j)
sql += "%s " % interp_where_j[random.randint(0,5)] sql += "%s " % interp_where_j[random.randint(0,5)]
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
......
...@@ -445,7 +445,7 @@ if $rows != $val then ...@@ -445,7 +445,7 @@ if $rows != $val then
endi endi
print ================>TD-5600 print ================>TD-5600
sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb1.ts>=100000 interval(1s) fill(linear); sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb1.ts>=100000 interval(1s);
#=============================================================== #===============================================================
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册