提交 1967ed14 编写于 作者: A AlexDuan

add tsClientMerge option to enable or disable client merge row and fix server...

add tsClientMerge option to enable or disable client merge row and fix server duplicate row merge bug in skiplist
上级 3514fb66
......@@ -762,35 +762,32 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
if (!dataBuf->ordered) {
char *pBlockData = pBlocks->data;
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
dataBuf->ordered = true;
int32_t i = 0;
int32_t j = 1;
while (j < pBlocks->numOfRows) {
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
if(tsClientMerge) {
int32_t i = 0;
int32_t j = 1;
while (j < pBlocks->numOfRows) {
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
if (ti == tj) {
if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) {
memmove(pBlockData + dataBuf->rowSize * i, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
if (ti == tj) {
if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) {
memmove(pBlockData + dataBuf->rowSize * i, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
}
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
}
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
}
++j;
}
pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
}
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
}
dataBuf->prevTS = INT64_MIN;
......@@ -836,32 +833,33 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
if (!dataBuf->ordered) {
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);
dataBuf->ordered = true;
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
int32_t i = 0;
int32_t j = 1;
while (j < nRows) {
TSKEY ti = (pBlkKeyTuple + i)->skey;
TSKEY tj = (pBlkKeyTuple + j)->skey;
if (ti == tj) {
if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) {
memmove(pBlkKeyTuple + i, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
if(tsClientMerge) {
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
int32_t i = 0;
int32_t j = 1;
while (j < nRows) {
TSKEY ti = (pBlkKeyTuple + i)->skey;
TSKEY tj = (pBlkKeyTuple + j)->skey;
if (ti == tj) {
if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) {
memmove(pBlkKeyTuple + i, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
}
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
}
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
}
++j;
}
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
pBlocks->numOfRows = i + 1;
}
}
dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
......
......@@ -209,6 +209,8 @@ extern int32_t wDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
int8_t tsClientMerge;
#ifdef TD_TSZ
// lossy
extern char lossyColumns[];
......@@ -219,6 +221,7 @@ extern uint32_t curRange;
extern char Compressor[];
#endif
typedef struct {
char dir[TSDB_FILENAME_LEN];
int level;
......
......@@ -254,6 +254,8 @@ int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135;
int8_t tsClientMerge = 0;
#ifdef TD_TSZ
//
// lossy compress 6
......@@ -1581,6 +1583,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "clientMerge";
cfg.ptr = &tsClientMerge;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ
// lossy compress
cfg.option = "lossyColumns";
......
......@@ -1296,9 +1296,6 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
if (pCtx[k].currentStage == MERGE_STAGE) {
pCtx[k].order = TSDB_ORDER_ASC;
}
pCtx[k].startTs = pQueryAttr->window.skey;
if (pCtx[k].functionId < 0) {
// load the script and exec
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#define TSDB_CFG_MAX_NUM 119
#define TSDB_CFG_MAX_NUM 120
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
......
......@@ -163,6 +163,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
for (int i = 0; i < pSkipList->maxLevel; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
}
} else if(compare == 0) {
// same need special deal
forward[0] = SL_NODE_GET_BACKWARD_POINTER(SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail,0),0);
hasDup = true;
} else {
SSkipListNode *px = pSkipList->pHead;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册