提交 ef89a31e 编写于 作者: D dapan1121

feat: force fill function

上级 d5bc02c1
...@@ -269,74 +269,77 @@ ...@@ -269,74 +269,77 @@
#define TK_SLIDING 251 #define TK_SLIDING 251
#define TK_FILL 252 #define TK_FILL 252
#define TK_VALUE 253 #define TK_VALUE 253
#define TK_NONE 254 #define TK_VALUE_F 254
#define TK_PREV 255 #define TK_NONE 255
#define TK_LINEAR 256 #define TK_PREV 256
#define TK_NEXT 257 #define TK_NULL_F 257
#define TK_HAVING 258 #define TK_LINEAR 258
#define TK_RANGE 259 #define TK_NEXT 259
#define TK_EVERY 260 #define TK_HAVING 260
#define TK_ORDER 261 #define TK_RANGE 261
#define TK_SLIMIT 262 #define TK_EVERY 262
#define TK_SOFFSET 263 #define TK_ORDER 263
#define TK_LIMIT 264 #define TK_SLIMIT 264
#define TK_OFFSET 265 #define TK_SOFFSET 265
#define TK_ASC 266 #define TK_LIMIT 266
#define TK_NULLS 267 #define TK_OFFSET 267
#define TK_ABORT 268 #define TK_ASC 268
#define TK_AFTER 269 #define TK_NULLS 269
#define TK_ATTACH 270 #define TK_ABORT 270
#define TK_BEFORE 271 #define TK_AFTER 271
#define TK_BEGIN 272 #define TK_ATTACH 272
#define TK_BITAND 273 #define TK_BEFORE 273
#define TK_BITNOT 274 #define TK_BEGIN 274
#define TK_BITOR 275 #define TK_BITAND 275
#define TK_BLOCKS 276 #define TK_BITNOT 276
#define TK_CHANGE 277 #define TK_BITOR 277
#define TK_COMMA 278 #define TK_BLOCKS 278
#define TK_COMPACT 279 #define TK_CHANGE 279
#define TK_CONCAT 280 #define TK_COMMA 280
#define TK_CONFLICT 281 #define TK_COMPACT 281
#define TK_COPY 282 #define TK_CONCAT 282
#define TK_DEFERRED 283 #define TK_CONFLICT 283
#define TK_DELIMITERS 284 #define TK_COPY 284
#define TK_DETACH 285 #define TK_DEFERRED 285
#define TK_DIVIDE 286 #define TK_DELIMITERS 286
#define TK_DOT 287 #define TK_DETACH 287
#define TK_EACH 288 #define TK_DIVIDE 288
#define TK_FAIL 289 #define TK_DOT 289
#define TK_FILE 290 #define TK_EACH 290
#define TK_FOR 291 #define TK_FAIL 291
#define TK_GLOB 292 #define TK_FILE 292
#define TK_ID 293 #define TK_FOR 293
#define TK_IMMEDIATE 294 #define TK_GLOB 294
#define TK_IMPORT 295 #define TK_ID 295
#define TK_INITIALLY 296 #define TK_IMMEDIATE 296
#define TK_INSTEAD 297 #define TK_IMPORT 297
#define TK_ISNULL 298 #define TK_INITIALLY 298
#define TK_KEY 299 #define TK_INSTEAD 299
#define TK_MODULES 300 #define TK_ISNULL 300
#define TK_NK_BITNOT 301 #define TK_KEY 301
#define TK_NK_SEMI 302 #define TK_MODULES 302
#define TK_NOTNULL 303 #define TK_NK_BITNOT 303
#define TK_OF 304 #define TK_NK_SEMI 304
#define TK_PLUS 305 #define TK_NOTNULL 305
#define TK_PRIVILEGE 306 #define TK_OF 306
#define TK_RAISE 307 #define TK_PLUS 307
#define TK_REPLACE 308 #define TK_PRIVILEGE 308
#define TK_RESTRICT 309 #define TK_RAISE 309
#define TK_ROW 310 #define TK_REPLACE 310
#define TK_SEMI 311 #define TK_RESTRICT 311
#define TK_STAR 312 #define TK_ROW 312
#define TK_STATEMENT 313 #define TK_SEMI 313
#define TK_STRICT 314 #define TK_STAR 314
#define TK_STRING 315 #define TK_STATEMENT 315
#define TK_TIMES 316 #define TK_STRICT 316
#define TK_UPDATE 317 #define TK_STRING 317
#define TK_VALUES 318 #define TK_TIMES 318
#define TK_VARIABLE 319 #define TK_UPDATE 319
#define TK_VIEW 320 #define TK_VALUES 320
#define TK_WAL 321 #define TK_VARIABLE 321
#define TK_VIEW 322
#define TK_WAL 323
#define TK_NK_SPACE 600 #define TK_NK_SPACE 600
#define TK_NK_COMMENT 601 #define TK_NK_COMMENT 601
......
...@@ -62,7 +62,6 @@ typedef struct SFillInfo { ...@@ -62,7 +62,6 @@ typedef struct SFillInfo {
int32_t srcTsSlotId; // timestamp column id in the source data block. int32_t srcTsSlotId; // timestamp column id in the source data block.
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC] int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
int32_t type; // fill type int32_t type; // fill type
bool forceFill; // force fill values
int32_t numOfRows; // number of rows in the input data block int32_t numOfRows; // number of rows in the input data block
int32_t index; // active row index int32_t index; // active row index
int32_t numOfTotal; // number of filled rows in one round int32_t numOfTotal; // number of filled rows in one round
......
...@@ -140,7 +140,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -140,7 +140,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
while (1) { while (1) {
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
if (pBlock == NULL) { if (pBlock == NULL) {
if (pInfo->totalInputRows == 0 && !pInfo->pFillInfo->forceFill) { if (pInfo->totalInputRows == 0 && (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -456,7 +456,7 @@ void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) { ...@@ -456,7 +456,7 @@ void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
return NULL; return NULL;
} }
void* destroyStreamFillInfo(SStreamFillInfo* pFillInfo) { void* destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F ||
pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) { pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
taosMemoryFreeClear(pFillInfo->pResRow->pRowVal); taosMemoryFreeClear(pFillInfo->pResRow->pRowVal);
taosMemoryFreeClear(pFillInfo->pResRow); taosMemoryFreeClear(pFillInfo->pResRow);
...@@ -1433,7 +1433,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1433,7 +1433,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pCell->isNull = true; pCell->isNull = true;
} }
} }
} else if (pInfo->pFillInfo->type == TSDB_FILL_NULL) { } else if (pInfo->pFillInfo->type == TSDB_FILL_NULL || pInfo->pFillInfo->type == TSDB_FILL_NULL_F) {
for (int32_t i = 0; i < pInfo->pFillSup->numOfAllCols; ++i) { for (int32_t i = 0; i < pInfo->pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pInfo->pFillSup->pAllColInfo + i; SFillColInfo* pFillCol = pInfo->pFillSup->pAllColInfo + i;
int32_t slotId = GET_DEST_SLOT_ID(pFillCol); int32_t slotId = GET_DEST_SLOT_ID(pFillCol);
......
...@@ -186,7 +186,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* ...@@ -186,7 +186,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
} }
} }
} }
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) { // fill with NULL
setNullRow(pBlock, pFillInfo, index); setNullRow(pBlock, pFillInfo, index);
} else { // fill with user specified value for each column } else { // fill with user specified value for each column
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
...@@ -349,7 +349,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t ...@@ -349,7 +349,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, index, src, isNull); colDataAppend(pDst, index, src, isNull);
saveColData(pFillInfo->prev.pRowVal, i, src, isNull); // todo: saveColData(pFillInfo->prev.pRowVal, i, src, isNull); // todo:
} else if (pFillInfo->type == TSDB_FILL_NULL) { } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
colDataAppendNULL(pDst, index); colDataAppendNULL(pDst, index);
} else if (pFillInfo->type == TSDB_FILL_NEXT) { } else if (pFillInfo->type == TSDB_FILL_NEXT) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal; SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
...@@ -578,15 +578,14 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) { ...@@ -578,15 +578,14 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
} }
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
int64_t* tsList = (int64_t*)pCol->pData;
int32_t numOfRows = taosNumOfRemainRows(pFillInfo); int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
TSKEY ekey1 = ekey; TSKEY ekey1 = ekey;
int64_t numOfRes = -1; int64_t numOfRes = -1;
if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set. if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set.
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
int64_t* tsList = (int64_t*)pCol->pData;
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
numOfRes = taosTimeCountInterval(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding, numOfRes = taosTimeCountInterval(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision); pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
......
...@@ -181,12 +181,14 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -181,12 +181,14 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
switch (pSliceInfo->fillType) { switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL: { case TSDB_FILL_NULL:
case TSDB_FILL_NULL_F: {
colDataAppendNULL(pDst, rows); colDataAppendNULL(pDst, rows);
break; break;
} }
case TSDB_FILL_SET_VALUE: { case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: {
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
......
...@@ -148,6 +148,7 @@ static SKeyword keywordTable[] = { ...@@ -148,6 +148,7 @@ static SKeyword keywordTable[] = {
{"NOT", TK_NOT}, {"NOT", TK_NOT},
{"NOW", TK_NOW}, {"NOW", TK_NOW},
{"NULL", TK_NULL}, {"NULL", TK_NULL},
{"NULL_F", TK_NULL_F},
{"NULLS", TK_NULLS}, {"NULLS", TK_NULLS},
{"OFFSET", TK_OFFSET}, {"OFFSET", TK_OFFSET},
{"ON", TK_ON}, {"ON", TK_ON},
...@@ -237,6 +238,7 @@ static SKeyword keywordTable[] = { ...@@ -237,6 +238,7 @@ static SKeyword keywordTable[] = {
{"USERS", TK_USERS}, {"USERS", TK_USERS},
{"USING", TK_USING}, {"USING", TK_USING},
{"VALUE", TK_VALUE}, {"VALUE", TK_VALUE},
{"VALUE_F", TK_VALUE_F},
{"VALUES", TK_VALUES}, {"VALUES", TK_VALUES},
{"VARCHAR", TK_VARCHAR}, {"VARCHAR", TK_VARCHAR},
{"VARIABLES", TK_VARIABLES}, {"VARIABLES", TK_VARIABLES},
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册