未验证 提交 72be4272 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #18559 from taosdata/refact/submit_req_wxy

enh: add table blokc order check
...@@ -153,6 +153,9 @@ typedef struct STableDataCxt { ...@@ -153,6 +153,9 @@ typedef struct STableDataCxt {
SBoundColInfo boundColsInfo; SBoundColInfo boundColsInfo;
SArray *pValues; SArray *pValues;
SSubmitTbData *pData; SSubmitTbData *pData;
TSKEY lastTs;
bool ordered;
bool duplicateTs;
} STableDataCxt; } STableDataCxt;
typedef struct SVgroupDataCxt { typedef struct SVgroupDataCxt {
...@@ -161,6 +164,7 @@ typedef struct SVgroupDataCxt { ...@@ -161,6 +164,7 @@ typedef struct SVgroupDataCxt {
} SVgroupDataCxt; } SVgroupDataCxt;
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt); SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt);
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks); int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
......
...@@ -1175,6 +1175,9 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC ...@@ -1175,6 +1175,9 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) {
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
}
} }
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { if (TSDB_CODE_SUCCESS == code && !isParseBindParam) {
......
...@@ -981,6 +981,24 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { ...@@ -981,6 +981,24 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) {
// once the data block is disordered, we do NOT keep last timestamp any more
if (!pTableCxt->ordered) {
return;
}
if (tsKey < pTableCxt->lastTs) {
pTableCxt->ordered = false;
}
if (tsKey == pTableCxt->lastTs) {
pTableCxt->duplicateTs = true;
}
pTableCxt->lastTs = tsKey;
return;
}
static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); } static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) { static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) {
...@@ -991,6 +1009,10 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat ...@@ -991,6 +1009,10 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pTableCxt->lastTs = 0;
pTableCxt->ordered = true;
pTableCxt->duplicateTs = false;
pTableCxt->pMeta = tableMetaDup(pTableMeta); pTableCxt->pMeta = tableMetaDup(pTableMeta);
if (NULL == pTableCxt->pMeta) { if (NULL == pTableCxt->pMeta) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1179,7 +1201,12 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { ...@@ -1179,7 +1201,12 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
void* p = taosHashIterate(pTableHash, NULL); void* p = taosHashIterate(pTableHash, NULL);
while (TSDB_CODE_SUCCESS == code && NULL != p) { while (TSDB_CODE_SUCCESS == code && NULL != p) {
STableDataCxt* pTableCxt = *(STableDataCxt**)p; STableDataCxt* pTableCxt = *(STableDataCxt**)p;
code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); if (!pTableCxt->ordered) {
tRowSort(pTableCxt->pData->aRowP);
}
if (!pTableCxt->ordered || pTableCxt->duplicateTs) {
code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SVgroupDataCxt* pVgCxt = NULL; SVgroupDataCxt* pVgCxt = NULL;
int32_t vgId = pTableCxt->pMeta->vgId; int32_t vgId = pTableCxt->pMeta->vgId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册