提交 bf08d7dd 编写于 作者: H Haojun Liao

fix(query): fix bug in fill

上级 f8bd7789
...@@ -28,8 +28,9 @@ struct SSDataBlock; ...@@ -28,8 +28,9 @@ struct SSDataBlock;
typedef struct SFillColInfo { typedef struct SFillColInfo {
SExprInfo *pExpr; SExprInfo *pExpr;
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN // int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list bool notFillCol; // denote if this column needs fill operation
// int16_t tagIndex; // index of current tag in SFillTagColInfo array list
SVariant fillVal; SVariant fillVal;
} SFillColInfo; } SFillColInfo;
...@@ -49,10 +50,7 @@ typedef struct SFillInfo { ...@@ -49,10 +50,7 @@ typedef struct SFillInfo {
int32_t index; // active row index int32_t index; // active row index
int32_t numOfTotal; // number of filled rows in one round int32_t numOfTotal; // number of filled rows in one round
int32_t numOfCurrent; // number of filled rows in current results int32_t numOfCurrent; // number of filled rows in current results
int32_t numOfTags; // number of tags
int32_t numOfCols; // number of columns, including the tags columns int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
SInterval interval; SInterval interval;
SArray *prev; SArray *prev;
...@@ -71,10 +69,10 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t ...@@ -71,10 +69,10 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val); struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr, int32_t numOfNotFillCols, const struct SNodeListNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
int32_t order, const char* id); int32_t order, const char* id);
......
...@@ -3267,8 +3267,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -3267,8 +3267,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
} }
} }
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows; int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows); taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
...@@ -3595,16 +3593,16 @@ void doDestroyExchangeOperatorInfo(void* param) { ...@@ -3595,16 +3593,16 @@ void doDestroyExchangeOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode, static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
int32_t order) { const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
pInfo->pFillInfo = pInfo->pFillInfo =
taosCreateFillInfo(w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id); taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id);
pInfo->win = win; pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
...@@ -3626,9 +3624,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3626,9 +3624,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error; goto _error;
} }
int32_t num = 0; int32_t num = 0, num1 = 0;
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &num);
SExprInfo* pCopyColumnExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &num1);
SInterval* pInterval = SInterval* pInterval =
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
...@@ -3639,13 +3639,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3639,13 +3639,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SResultInfo* pResultInfo = &pOperator->resultInfo; SResultInfo* pResultInfo = &pOperator->resultInfo;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, int32_t code = initFillInfo(pInfo, pExprInfo, num, pCopyColumnExprInfo, num1,
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
...@@ -33,7 +33,13 @@ ...@@ -33,7 +33,13 @@
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) \ #define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) \
((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1)))) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
#define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId)
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) { static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
#if 0
for (int32_t j = 0; j < pFillInfo->numOfCols; ++j) { for (int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
SFillColInfo* pCol = &pFillInfo->pFillCol[j]; SFillColInfo* pCol = &pFillInfo->pFillCol[j];
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || TSDB_COL_IS_UD_COL(pCol->flag)) { if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || TSDB_COL_IS_UD_COL(pCol->flag)) {
...@@ -47,25 +53,28 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) { ...@@ -47,25 +53,28 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type); assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type);
} }
#endif
} }
static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) { static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
// the first are always the timestamp column, so start from the second column. for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i];
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i); int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
if (p->info.type == TSDB_DATA_TYPE_TIMESTAMP) { // handle timestamp SColumnInfoData* pDstColInfo = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colDataAppend(p, rowIndex, (const char*)&ts, false); if (pCol->notFillCol) {
if (pDstColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstColInfo, rowIndex, (const char*)&pFillInfo->currentKey, false);
} else {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDstColInfo, rowIndex, pKey);
}
} else { } else {
colDataAppendNULL(p, rowIndex); colDataAppendNULL(pDstColInfo, rowIndex);
} }
} }
} }
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
#define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId)
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) { static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) {
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0; float v = 0;
...@@ -100,9 +109,6 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* ...@@ -100,9 +109,6 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol)); SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
...@@ -118,10 +124,6 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* ...@@ -118,10 +124,6 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
// todo refactor: start from 0 not 1 // todo refactor: start from 0 not 1
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol)); SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
...@@ -134,59 +136,70 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* ...@@ -134,59 +136,70 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
} else if (pFillInfo->type == TSDB_FILL_LINEAR) { } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value // TODO : linear interpolation supports NULL value
if (outOfBound) { if (outOfBound) {
setNullRow(pBlock, pFillInfo->currentKey, index); setNullRow(pBlock, pFillInfo, index);
} else { } else {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol); int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
int16_t type = pDstCol->info.type;
if (pCol->notFillCol) {
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false);
} else {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDstCol, index, pKey);
}
} else {
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
colDataAppendNULL(pDstCol, index);
continue;
}
int16_t type = pDstCol->info.type; SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, pFillInfo->tsSlotId);
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false);
continue;
}
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
colDataAppendNULL(pDstCol, index);
continue;
}
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, pFillInfo->tsSlotId);
int64_t prevTs = *(int64_t*)pKey1->pData; int64_t prevTs = *(int64_t*)pKey1->pData;
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol); int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId); SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
char* data = colDataGetData(pSrcCol, pFillInfo->index); char* data = colDataGetData(pSrcCol, pFillInfo->index);
point1 = (SPoint){.key = prevTs, .val = pKey->pData}; point1 = (SPoint){.key = prevTs, .val = pKey->pData};
point2 = (SPoint){.key = ts, .val = data}; point2 = (SPoint){.key = ts, .val = data};
int64_t out = 0; int64_t out = 0;
point = (SPoint){.key = pFillInfo->currentKey, .val = &out}; point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type); taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
colDataAppend(pDstCol, index, (const char*)&out, false); colDataAppend(pDstCol, index, (const char*)&out, false);
}
} }
} }
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL } else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
setNullRow(pBlock, pFillInfo->currentKey, index); setNullRow(pBlock, pFillInfo, index);
} else { // fill with user specified value for each column } else { // fill with user specified value for each column
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; int32_t slotId = GET_DEST_SLOT_ID(pCol);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, slotId);
doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
if (pCol->notFillCol) {
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
} else {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDst, index, pKey);
}
} else {
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
}
} }
} }
...@@ -284,12 +297,9 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t ...@@ -284,12 +297,9 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next); copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next);
} }
// assign rows to dst buffer // copy rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag) /* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
continue;
}
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol); int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol); int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
...@@ -298,11 +308,10 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t ...@@ -298,11 +308,10 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
char* src = colDataGetData(pSrc, pFillInfo->index); char* src = colDataGetData(pSrc, pFillInfo->index);
if (/*i == 0 || (*/ !colDataIsNull_s(pSrc, pFillInfo->index)) { if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); colDataAppend(pDst, index, src, false);
colDataAppend(pDst, index, src, isNull); saveColData(pFillInfo->prev, i, src, false);
saveColData(pFillInfo->prev, i, src, isNull); } else { // the value is null
} else {
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false); colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
} else { // i > 0 and data is null , do interpolation } else { // i > 0 and data is null , do interpolation
...@@ -357,7 +366,11 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo ...@@ -357,7 +366,11 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
if (isNull) { if (isNull) {
pKey->isNull = true; pKey->isNull = true;
} else { } else {
memcpy(pKey->pData, src, pKey->bytes); if (IS_VAR_DATA_TYPE(pKey->type)) {
memcpy(pKey->pData, src, varDataTLen(src));
} else {
memcpy(pKey->pData, src, pKey->bytes);
}
pKey->isNull = false; pKey->isNull = false;
} }
} }
...@@ -378,53 +391,6 @@ static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int ...@@ -378,53 +391,6 @@ static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int
return resultCapacity; return resultCapacity;
} }
// there are no duplicated tags in the SFillTagColInfo list
static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t capacity) {
int32_t rowsize = 0;
int32_t numOfTags = 0;
int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
SResSchema* pSchema = &pColInfo->pExpr->base.resSchema;
if (TSDB_COL_IS_TAG(pColInfo->flag) || pSchema->type == TSDB_DATA_TYPE_BINARY) {
numOfTags += 1;
bool exists = false;
int32_t index = -1;
for (int32_t j = 0; j < k; ++j) {
if (pFillInfo->pTags[j].col.colId == pSchema->slotId) {
exists = true;
index = j;
break;
}
}
if (!exists) {
SSchema* pSchema1 = &pFillInfo->pTags[k].col;
pSchema1->colId = pSchema->slotId;
pSchema1->type = pSchema->type;
pSchema1->bytes = pSchema->bytes;
pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pSchema->bytes);
pColInfo->tagIndex = k;
k += 1;
} else {
pColInfo->tagIndex = index;
}
}
rowsize += pSchema->bytes;
}
pFillInfo->numOfTags = numOfTags;
assert(k <= pFillInfo->numOfTags);
return rowsize;
}
static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) { if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) {
return 0; return 0;
...@@ -433,7 +399,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { ...@@ -433,7 +399,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
return pFillInfo->numOfRows - pFillInfo->index; return pFillInfo->numOfRows - pFillInfo->index;
} }
struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
int32_t primaryTsSlotId, int32_t order, const char* id) { int32_t primaryTsSlotId, int32_t order, const char* id) {
if (fillType == TSDB_FILL_NONE) { if (fillType == TSDB_FILL_NONE) {
...@@ -476,26 +442,15 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capa ...@@ -476,26 +442,15 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capa
pFillInfo->type = fillType; pFillInfo->type = fillType;
pFillInfo->pFillCol = pCol; pFillInfo->pFillCol = pCol;
pFillInfo->numOfTags = numOfTags; pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols;
pFillInfo->numOfCols = numOfCols;
pFillInfo->alloc = capacity; pFillInfo->alloc = capacity;
pFillInfo->id = id; pFillInfo->id = id;
pFillInfo->interval = *pInterval; pFillInfo->interval = *pInterval;
// if (numOfTags > 0) { pFillInfo->next = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo)); pFillInfo->prev = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
for (int32_t i = 0; i < numOfCols; ++i) {
pFillInfo->pTags[i].col.colId = -2; // TODO
}
// }
pFillInfo->next = taosArrayInit(numOfCols, sizeof(SGroupKeys));
pFillInfo->prev = taosArrayInit(numOfCols, sizeof(SGroupKeys));
initBeforeAfterDataBuf(pFillInfo); initBeforeAfterDataBuf(pFillInfo);
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0);
return pFillInfo; return pFillInfo;
} }
...@@ -524,9 +479,9 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { ...@@ -524,9 +479,9 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
} }
taosArrayDestroy(pFillInfo->next); taosArrayDestroy(pFillInfo->next);
for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) { // for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
taosMemoryFreeClear(pFillInfo->pTags[i].tagVal); // taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
} // }
taosMemoryFreeClear(pFillInfo->pTags); taosMemoryFreeClear(pFillInfo->pTags);
taosMemoryFreeClear(pFillInfo->pFillCol); taosMemoryFreeClear(pFillInfo->pFillCol);
...@@ -642,17 +597,18 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca ...@@ -642,17 +597,18 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca
int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; } int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; }
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* pValNode) { SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo)); int32_t numOfNotFillExpr, const struct SNodeListNode* pValNode) {
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNotFillExpr, sizeof(SFillColInfo));
if (pFillCol == NULL) { if (pFillCol == NULL) {
return NULL; return NULL;
} }
size_t len = (pValNode != NULL) ? LIST_LENGTH(pValNode->pNodeList) : 0; size_t len = (pValNode != NULL) ? LIST_LENGTH(pValNode->pNodeList) : 0;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfFillExpr; ++i) {
SExprInfo* pExprInfo = &pExpr[i]; SExprInfo* pExprInfo = &pExpr[i];
pFillCol[i].pExpr = pExprInfo; pFillCol[i].pExpr = pExprInfo;
pFillCol[i].tagIndex = -2; pFillCol[i].notFillCol = false;
// todo refactor // todo refactor
if (len > 0) { if (len > 0) {
...@@ -664,9 +620,15 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const str ...@@ -664,9 +620,15 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const str
} }
if (pExprInfo->base.numOfParams > 0) { if (pExprInfo->base.numOfParams > 0) {
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query // pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
} }
} }
for(int32_t i = 0; i < numOfNotFillExpr; ++i) {
SExprInfo* pExprInfo = &pNotFillExpr[i];
pFillCol[i + numOfFillExpr].pExpr = pExprInfo;
pFillCol[i + numOfFillExpr].notFillCol = true;
}
return pFillCol; return pFillCol;
} }
\ No newline at end of file
...@@ -2359,7 +2359,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -2359,7 +2359,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->win = pInterpPhyNode->timeRange; pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval; pInfo->interval.interval = pInterpPhyNode->interval;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册