diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 125e28f17778baead6b2be8d8409f2259d7d77e5..0750abdf6ff991d8c19eb63b77c5ba9ff9ddfb03 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -762,35 +762,32 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { if (!dataBuf->ordered) { char *pBlockData = pBlocks->data; qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); + dataBuf->ordered = true; - int32_t i = 0; - int32_t j = 1; - - while (j < pBlocks->numOfRows) { - TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i); - TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j); + if(tsClientMerge) { + int32_t i = 0; + int32_t j = 1; + while (j < pBlocks->numOfRows) { + TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i); + TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j); - if (ti == tj) { - if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) { - memmove(pBlockData + dataBuf->rowSize * i, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize); + if (ti == tj) { + if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) { + memmove(pBlockData + dataBuf->rowSize * i, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize); + } + ++j; + continue; } + int32_t nextPos = (++i); + if (nextPos != j) { + memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize); + } ++j; - continue; - } - - int32_t nextPos = (++i); - if (nextPos != j) { - memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize); - } - - ++j; + } + pBlocks->numOfRows = i + 1; + dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows; } - - dataBuf->ordered = true; - - pBlocks->numOfRows = i + 1; - dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows; } dataBuf->prevTS = INT64_MIN; @@ -836,32 +833,33 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk if (!dataBuf->ordered) { pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar); + dataBuf->ordered = true; - pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; - int32_t i = 0; - int32_t j = 1; - while (j < nRows) { - TSKEY ti = (pBlkKeyTuple + i)->skey; - TSKEY tj = (pBlkKeyTuple + j)->skey; - - if (ti == tj) { - if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) { - memmove(pBlkKeyTuple + i, pBlkKeyTuple + j, sizeof(SBlockKeyTuple)); + if(tsClientMerge) { + pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + int32_t i = 0; + int32_t j = 1; + while (j < nRows) { + TSKEY ti = (pBlkKeyTuple + i)->skey; + TSKEY tj = (pBlkKeyTuple + j)->skey; + + if (ti == tj) { + if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) { + memmove(pBlkKeyTuple + i, pBlkKeyTuple + j, sizeof(SBlockKeyTuple)); + } + + ++j; + continue; } + int32_t nextPos = (++i); + if (nextPos != j) { + memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple)); + } ++j; - continue; - } - - int32_t nextPos = (++i); - if (nextPos != j) { - memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple)); } - ++j; - } - - dataBuf->ordered = true; - pBlocks->numOfRows = i + 1; + pBlocks->numOfRows = i + 1; + } } dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize; diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index a384cf6e70485b8f7d1b06b7f7e86ba92776b547..1e1e2617a953e6d4156e3032266bc8b1bdf8090d 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -209,6 +209,8 @@ extern int32_t wDebugFlag; extern int32_t cqDebugFlag; extern int32_t debugFlag; +int8_t tsClientMerge; + #ifdef TD_TSZ // lossy extern char lossyColumns[]; @@ -219,6 +221,7 @@ extern uint32_t curRange; extern char Compressor[]; #endif + typedef struct { char dir[TSDB_FILENAME_LEN]; int level; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 876eb4fa92bada1d22e0e43b6d6532d31a0b913c..f1a7e4f7fd12a85ed1b8b503db35fba7211fc788 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -254,6 +254,8 @@ int32_t tsdbDebugFlag = 131; int32_t cqDebugFlag = 131; int32_t fsDebugFlag = 135; +int8_t tsClientMerge = 0; + #ifdef TD_TSZ // // lossy compress 6 @@ -1581,6 +1583,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "clientMerge"; + cfg.ptr = &tsClientMerge; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 1; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + #ifdef TD_TSZ // lossy compress cfg.option = "lossyColumns"; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 42787f22190e3fd7685af6598b51fc9c9699d0c2..73c93dba83530b0bb2cfc79552ba953f309a77b1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1296,9 +1296,6 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx if (pCtx[k].currentStage == MERGE_STAGE) { pCtx[k].order = TSDB_ORDER_ASC; } - - pCtx[k].startTs = pQueryAttr->window.skey; - if (pCtx[k].functionId < 0) { // load the script and exec SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index 5bd69aece2ffa2166c268b437549c95a30608383..d6a1cb113a43874ee77b096e75959e02e4aa6747 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 119 +#define TSDB_CFG_MAX_NUM 120 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 98fd9c094cba3e779c9f203fdacc548a3bda5ef4..9f00b1750bdba64d06c51a9eb6d7de81bf725d1a 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -163,6 +163,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it for (int i = 0; i < pSkipList->maxLevel; i++) { forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i); } + } else if(compare == 0) { + // same need special deal + forward[0] = SL_NODE_GET_BACKWARD_POINTER(SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail,0),0); + hasDup = true; } else { SSkipListNode *px = pSkipList->pHead; for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {