提交 d4c783b5 编写于 作者: G Ganlin Zhao

check duplicate timestamps

上级 0ec4bbdf
...@@ -36,6 +36,8 @@ typedef struct STimeSliceOperatorInfo { ...@@ -36,6 +36,8 @@ typedef struct STimeSliceOperatorInfo {
SColumn tsCol; // primary timestamp column SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info struct SFillColInfo* pFillColInfo; // fill column info
int64_t prevTs;
bool prevTsSet;
} STimeSliceOperatorInfo; } STimeSliceOperatorInfo;
static void destroyTimeSliceOperatorInfo(void* param); static void destroyTimeSliceOperatorInfo(void* param);
...@@ -166,6 +168,28 @@ static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) { ...@@ -166,6 +168,28 @@ static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0); return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
} }
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
int32_t curIndex, int32_t rows) {
int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) {
return true;
}
pSliceInfo->prevTsSet = true;
pSliceInfo->prevTs = currentTs;
if (curIndex < rows - 1) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
if (currentTs == nextTs) {
return true;
}
}
return false;
}
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
bool beforeTs) { bool beforeTs) {
int32_t rows = pResBlock->info.rows; int32_t rows = pResBlock->info.rows;
...@@ -472,6 +496,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -472,6 +496,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
// check for duplicate timestamps
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
}
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
...@@ -612,6 +641,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -612,6 +641,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->win = pInterpPhyNode->timeRange; pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval; pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey; pInfo->current = pInfo->win.skey;
pInfo->prevTsSet = false;
pInfo->prevTs = 0;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册