diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 61fbf1504bf132aff62ce4f0e9dc551914e67366..b74ba8d2993f5eca7859029abfe987bbcd6bc54d 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -155,6 +155,7 @@ typedef struct STableDataCxt { SSubmitTbData *pData; TSKEY lastTs; bool ordered; + bool duplicateTs; } STableDataCxt; typedef struct SVgroupDataCxt { diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index b8158ea7ede7124da231514cf1daac23276119a3..b5309e16bf972f22152fd1dff6fb7c1231c5d9a2 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -982,15 +982,19 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { } void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) { - // once the data block is disordered, we do NOT keep previous timestamp any more + // once the data block is disordered, we do NOT keep last timestamp any more if (!pTableCxt->ordered) { return; } - if (tsKey <= pTableCxt->lastTs) { + if (tsKey < pTableCxt->lastTs) { pTableCxt->ordered = false; } + if (tsKey == pTableCxt->lastTs) { + pTableCxt->duplicateTs = true; + } + pTableCxt->lastTs = tsKey; return; } @@ -1005,8 +1009,9 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat int32_t code = TSDB_CODE_SUCCESS; - pTableCxt->ordered = true; pTableCxt->lastTs = 0; + pTableCxt->ordered = true; + pTableCxt->duplicateTs = false; pTableCxt->pMeta = tableMetaDup(pTableMeta); if (NULL == pTableCxt->pMeta) { @@ -1196,10 +1201,12 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { void* p = taosHashIterate(pTableHash, NULL); while (TSDB_CODE_SUCCESS == code && NULL != p) { STableDataCxt* pTableCxt = *(STableDataCxt**)p; - if (pTableCxt->ordered) { + if (!pTableCxt->ordered) { tRowSort(pTableCxt->pData->aRowP); } - code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); + if (!pTableCxt->ordered || pTableCxt->duplicateTs) { + code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); + } if (TSDB_CODE_SUCCESS == code) { SVgroupDataCxt* pVgCxt = NULL; int32_t vgId = pTableCxt->pMeta->vgId;