提交 166cdcec 编写于 作者: H Haojun Liao

fix(query): fix bug in fill.

上级 3fb91c6a
......@@ -624,6 +624,7 @@ typedef struct SIndefOperatorInfo {
typedef struct SFillOperatorInfo {
struct SFillInfo* pFillInfo;
SSDataBlock* pRes;
SSDataBlock* pFinalRes;
int64_t totalInputRows;
void** p;
SSDataBlock* existNewGroupBlock;
......@@ -631,6 +632,7 @@ typedef struct SFillOperatorInfo {
SNode* pCondition;
SArray* pColMatchColInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
uint64_t curGroupId; // current handled group id
SExprInfo* pExprInfo;
int32_t numOfExpr;
......
......@@ -28,9 +28,7 @@ struct SSDataBlock;
typedef struct SFillColInfo {
SExprInfo *pExpr;
// int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
bool notFillCol; // denote if this column needs fill operation
// int16_t tagIndex; // index of current tag in SFillTagColInfo array list
SVariant fillVal;
} SFillColInfo;
......@@ -39,6 +37,11 @@ typedef struct {
char* tagVal;
} SFillTagColInfo;
typedef struct {
int64_t key;
SArray* pRowVal;
} SRowVal;
typedef struct SFillInfo {
TSKEY start; // start timestamp
TSKEY end; // endKey for fill
......@@ -53,9 +56,8 @@ typedef struct SFillInfo {
int32_t numOfCurrent; // number of filled rows in current results
int32_t numOfCols; // number of columns, including the tags columns
SInterval interval;
SArray *prev;
SArray *next;
SRowVal prev;
SRowVal next;
SSDataBlock *pSrcBlock;
int32_t alloc; // data buffer size in rows
......
......@@ -3212,8 +3212,8 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
pInfo->pRes->info.groupId = pInfo->curGroupId;
return;
}
......@@ -3229,9 +3229,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultInfo* pResultInfo = &pOperator->resultInfo;
SSDataBlock* pResBlock = pInfo->pRes;
SSDataBlock* pResBlock = pInfo->pFinalRes;
blockDataCleanup(pResBlock);
blockDataCleanup(pInfo->pRes);
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > 0) {
......@@ -3251,17 +3255,23 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
} else {
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
SExprSupp* pSup = &pOperator->exprSupp;
getTableScanInfo(pOperator, &order, &scanFlag);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pInfo->pRes->info.groupId = pBlock->info.groupId;
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
pInfo->curGroupId = pBlock->info.groupId; // the first data block
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
pInfo->totalInputRows += pBlock->info.rows;
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
pInfo->curGroupId = pInfo->pRes->info.groupId; // the first data block
pInfo->totalInputRows += pInfo->pRes->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
} else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block
pInfo->existNewGroupBlock = pBlock;
......@@ -3649,7 +3659,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
......@@ -3663,6 +3674,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
}
pInfo->pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
pInfo->pCondition = pPhyFillNode->node.pConditions;
pInfo->pColMatchColInfo = pColMatchColInfo;
pOperator->name = "FillOperator";
......
......@@ -65,7 +65,7 @@ static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowInd
if (pDstColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstColInfo, rowIndex, (const char*)&pFillInfo->currentKey, false);
} else {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDstColInfo, rowIndex, pKey);
}
......@@ -105,7 +105,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
// set the other values
if (pFillInfo->type == TSDB_FILL_PREV) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SArray* p = FILL_IS_ASC_FILL(pFillInfo)? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
......@@ -120,7 +120,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
}
}
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
// todo refactor: start from 0 not 1
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
......@@ -149,21 +149,21 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false);
} else {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDstCol, index, pKey);
}
} else {
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
colDataAppendNULL(pDstCol, index);
continue;
}
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, pFillInfo->tsSlotId);
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev.pRowVal, pFillInfo->tsSlotId);
int64_t prevTs = *(int64_t*)pKey1->pData;
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
int32_t srcSlotId = GET_DEST_SLOT_ID(pCol);
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
char* data = colDataGetData(pSrcCol, pFillInfo->index);
......@@ -192,7 +192,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
} else {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDst, index, pKey);
}
......@@ -220,7 +220,7 @@ void doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey
}
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
if (taosArrayGetSize(pFillInfo->next) > 0) {
if (taosArrayGetSize(pFillInfo->next.pRowVal) > 0) {
return;
}
......@@ -234,10 +234,10 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
key.bytes = pSchema->bytes;
key.type = pSchema->type;
taosArrayPush(pFillInfo->next, &key);
taosArrayPush(pFillInfo->next.pRowVal, &key);
key.pData = taosMemoryMalloc(pSchema->bytes);
taosArrayPush(pFillInfo->prev, &key);
taosArrayPush(pFillInfo->prev.pRowVal, &key);
}
}
......@@ -245,13 +245,24 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
int32_t srcSlotId = GET_SRC_SLOT_ID(&pFillInfo->pFillCol[i]);
int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
if (type == QUERY_NODE_COLUMN) {
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
} else if (type == QUERY_NODE_OPERATOR) {
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, i);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
} else {
ASSERT(0);
}
}
}
......@@ -272,7 +283,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
// set the next value for interpolation
if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) {
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next.pRowVal);
}
if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
......@@ -294,39 +305,38 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
int32_t nextRowIndex = pFillInfo->index + 1;
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next);
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next.pRowVal);
}
// copy rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId);
SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, dstSlotId);
char* src = colDataGetData(pSrc, pFillInfo->index);
if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
colDataAppend(pDst, index, src, false);
saveColData(pFillInfo->prev, i, src, false);
saveColData(pFillInfo->prev.pRowVal, i, src, false);
} else { // the value is null
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
} else { // i > 0 and data is null , do interpolation
if (pFillInfo->type == TSDB_FILL_PREV) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDst, index, pKey);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, index, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull); // todo:
saveColData(pFillInfo->prev.pRowVal, i, src, isNull); // todo:
} else if (pFillInfo->type == TSDB_FILL_NULL) {
colDataAppendNULL(pDst, index);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDst, index, pKey);
} else {
......@@ -413,7 +423,7 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t
for(int32_t i = 0; i < numOfNotFillCols; ++i) {
SFillColInfo* p = &pCol[i + numOfFillCols];
int32_t srcSlotId = GET_SRC_SLOT_ID(p);
int32_t srcSlotId = GET_DEST_SLOT_ID(p);
if (srcSlotId == primaryTsSlotId) {
pFillInfo->tsSlotId = i + numOfFillCols;
break;
......@@ -453,8 +463,8 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t
pFillInfo->id = id;
pFillInfo->interval = *pInterval;
pFillInfo->next = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
pFillInfo->prev = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
pFillInfo->next.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
pFillInfo->prev.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
initBeforeAfterDataBuf(pFillInfo);
return pFillInfo;
......@@ -474,16 +484,16 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
if (pFillInfo == NULL) {
return NULL;
}
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev); ++i) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev.pRowVal); ++i) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i);
taosMemoryFree(pKey->pData);
}
taosArrayDestroy(pFillInfo->prev);
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next); ++i) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i);
taosArrayDestroy(pFillInfo->prev.pRowVal);
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next.pRowVal); ++i) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->next.pRowVal, i);
taosMemoryFree(pKey->pData);
}
taosArrayDestroy(pFillInfo->next);
taosArrayDestroy(pFillInfo->next.pRowVal);
// for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
// taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
......@@ -624,10 +634,6 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
}
// if (pExprInfo->base.numOfParams > 0) {
// pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
// }
}
for(int32_t i = 0; i < numOfNotFillExpr; ++i) {
......
......@@ -885,15 +885,15 @@ if $data10 != @20-01-01 01:01:10.000@ then
return -1
endi
if $data11 != 1.000000000 then
if $data11 != 99.000000000 then
return -1
endi
if $data12 != 1.000000000 then
if $data12 != 91.000000000 then
return -1
endi
if $data13 != -87.000000000 then
if $data13 != 90.000000000 then
return -1
endi
......@@ -917,15 +917,15 @@ if $data70 != @20-01-01 01:02:10.000@ then
return -1
endi
if $data71 != 1.000000000 then
if $data71 != 99.000000000 then
return -1
endi
if $data72 != 1.000000000 then
if $data72 != 91.000000000 then
return -1
endi
if $data73 != -87.000000000 then
if $data73 != 90.000000000 then
return -1
endi
......@@ -994,19 +994,19 @@ if $data10 != @20-01-01 01:01:10.000@ then
return -1
endi
if $data11 != 1.000000000 then
if $data11 != 99.000000000 then
return -1
endi
if $data12 != 1.000000000 then
if $data12 != 91.000000000 then
return -1
endi
if $data13 != -87.000000000 then
if $data13 != 90.000000000 then
return -1
endi
if $data14 != 86 then
if $data14 != 89 then
return -1
endi
......
......@@ -111,13 +111,15 @@ endi
if $data12 != -2 then
return -1
endi
if $data13 != -3.00000 then
if $data13 != -3 then
return -1
endi
if $data14 != -4.000000000 then
if $data14 != -4.00000 then
print expect -4.00000, actual: $data14
return -1
endi
if $data15 != -5 then
if $data15 != -5.000000000 then
print expect -5.000000000, actual: $data15
return -1
endi
if $data31 != -1 then
......@@ -126,10 +128,10 @@ endi
if $data52 != -2 then
return -1
endi
if $data73 != -3.00000 then
if $data73 != -3 then
return -1
endi
if $data74 != -4.000000000 then
if $data74 != -4.00000 then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册