未验证 提交 6b5ca95d 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #12096 from taosdata/feature/3.0_liaohj

enh(query): time window interpolation is valid for interval query.
......@@ -20,6 +20,12 @@
extern "C" {
#endif
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
#ifdef __cplusplus
}
......
......@@ -40,6 +40,7 @@ extern "C" {
#include "tpagedbuf.h"
#include "vnode.h"
#include "executorInt.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
......@@ -478,16 +479,8 @@ typedef struct SFillOperatorInfo {
void** p;
SSDataBlock* existNewGroupBlock;
bool multigroupResult;
SInterval intervalInfo;
} SFillOperatorInfo;
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
SArray* pGroupCols; // group by columns, SArray<SColumn>
......@@ -676,7 +669,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
......
......@@ -27,13 +27,12 @@ extern "C" {
struct SSDataBlock;
typedef struct SFillColInfo {
// STColumn col; // column info
SResSchema col;
int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
int32_t offset;
union {int64_t i; double d;} val;
SExprInfo *pExpr;
// SResSchema schema;
// int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
SVariant fillVal;
} SFillColInfo;
typedef struct {
......@@ -56,9 +55,10 @@ typedef struct SFillInfo {
int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
SInterval interval;
char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data
char** pData; // original result data block involved in filling data
SArray *prev;
SArray *next;
SSDataBlock *pSrcBlock;
int32_t alloc; // data buffer size in rows
SFillColInfo* pFillCol; // column info for fill operations
......@@ -72,7 +72,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
......@@ -80,7 +80,7 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
struct SFillColInfo* pCol, const char* id);
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
......
......@@ -1235,9 +1235,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
if (fmIsPseudoColumnFunc(pfCtx->functionId)) {
// do nothing
} else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) {
// todo set the correct timestamp column
pfCtx->input.pPTS = taosArrayGet(pSrcBlock->pDataBlock, 1);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
pfCtx->fpSet.init(&pCtx[k], pResInfo);
......@@ -2490,7 +2487,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
// if (pQueryAttr->pFilters != NULL) {
// filterSetColFieldData(pQueryAttr->pFilters, pBlock->info.numOfCols, pBlock->pDataBlock);
// }
// if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
// filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
// }
......@@ -3204,16 +3201,16 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows,
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pOutput, int32_t capacity, void** p) {
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
// SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
// p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows);
// }
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows);
pOutput->info.rows += numOfRows;
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
pBlock->info.rows += numOfRows;
return pOutput->info.rows;
return pBlock->info.rows;
}
void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) {
......@@ -5562,7 +5559,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
pInfo->existNewGroupBlock = NULL;
*newgroup = true;
}
......@@ -5571,7 +5568,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
return;
}
......@@ -5632,7 +5629,8 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
}
}
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity, pInfo->p);
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity);
// current group has no more result to return
if (pResBlock->info.rows > 0) {
......@@ -6206,19 +6204,17 @@ _error:
return NULL;
}
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, NULL);
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
// TODO set correct time precision
STimeWindow w = TSWINDOW_INITIALIZER;
getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, &w);
getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
int32_t order = TSDB_ORDER_ASC;
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
......@@ -6226,15 +6222,29 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
}
}
static SArray* getFillValue(SNodeListNode* pNodeList) {
SArray* pList = taosArrayInit(4, sizeof(SVariant));
size_t len = LIST_LENGTH(pNodeList->pNodeList);
for(int32_t i = 0; i < len; ++i) {
SValueNode* pvalue = (SValueNode*)nodesListGetNode(pNodeList->pNodeList, i);
SVariant v = {0};
valueNodeToVariant(pvalue, &v);
taosArrayPush(pList, &v);
}
return pList;
}
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* pValueNode,
bool multigroupResult, SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult;
pInfo->intervalInfo = *pInterval;
int32_t type = TSDB_FILL_NONE;
switch (fillType) {
......@@ -6263,7 +6273,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SResultInfo* pResultInfo = &pOperator->resultInfo;
initResultSizeInfo(pOperator, 4096);
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity,
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity,
pTaskInfo->id.str, pInterval, type);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -6272,7 +6282,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pOperator->name = "FillOperator";
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Fill;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
......@@ -6621,13 +6631,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t primaryTsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as,
pTableGroupInfo, pTaskInfo);
// if (pIntervalPhyNode->pFill != NULL) {
// pOptr = createFillOperatorInfo(pOptr, pExprInfo, num, &interval, pResBlock, pIntervalPhyNode->pFill->mode,
// NULL,
// false, pTaskInfo);
// }
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
......@@ -6665,6 +6668,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num);
SInterval* pInterval = &((STableIntervalOperatorInfo*)ops[0]->info)->interval;
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, (SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
} else {
ASSERT(0);
}
......
......@@ -13,20 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "function.h"
#include "os.h"
#include "querynodes.h"
#include "taosdef.h"
#include "tmsg.h"
#include "ttypes.h"
#include "tfill.h"
#include "function.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
#include "function.h"
#include "tdatablock.h"
#include "executorInt.h"
#include "querynodes.h"
#include "tfill.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
......@@ -37,168 +38,208 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
continue;
}
char* val1 = elePtrAt(data[j], pCol->col.bytes, genRows);
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
char* val1 = elePtrAt(data[j], pSchema->bytes, genRows);
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
// assert (pTag->col.colId == pCol->col.colId);
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type);
}
}
static void setNullValueForRow(SFillInfo* pFillInfo, void** data, int32_t numOfCol, int32_t rowIndex) {
static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex) {
// the first are always the timestamp column, so start from the second column.
for (int32_t i = 1; i < numOfCol; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* output = elePtrAt(data[i], pCol->col.bytes, rowIndex);
setNull(output, pCol->col.type, pCol->col.bytes);
for (int32_t i = 1; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
colDataAppendNULL(p, rowIndex);
}
}
static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData, int64_t ts, bool outOfBound) {
char* prev = pFillInfo->prevValues;
char* next = pFillInfo->nextValues;
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
#define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId)
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock *pBlock, SSDataBlock* pSrcBlock, int64_t ts, bool outOfBound) {
SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
// set the primary timestamp column value
int32_t index = pFillInfo->numOfCurrent;
char* val = elePtrAt(data[0], TSDB_KEYSIZE, index);
SColumnInfoData *pCol0 = taosArrayGet(pBlock->pDataBlock, 0);
char* val = colDataGetData(pCol0, index);
*(TSKEY*) val = pFillInfo->currentKey;
// set the other values
if (pFillInfo->type == TSDB_FILL_PREV) {
char* p = FILL_IS_ASC_FILL(pFillInfo) ? prev : next;
if (p != NULL) {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
char* output = elePtrAt(data[i], pCol->col.bytes, index);
// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
} else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
SGroupKeys* pKey = taosArrayGet(p, i);
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
doSetVal(pDstColInfoData, index, pKey);
}
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
char* p = FILL_IS_ASC_FILL(pFillInfo)? next : prev;
if (p != NULL) {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
char* output = elePtrAt(data[i], pCol->col.bytes, index);
// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
} else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
SGroupKeys* pKey = taosArrayGet(p, i);
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
doSetVal(pDstColInfoData, index, pKey);
}
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value
if (prev != NULL && !outOfBound) {
if (outOfBound) {
setNullRow(pBlock, pFillInfo->numOfCols, index);
} else {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
int16_t type = pCol->col.type;
int16_t bytes = pCol->col.bytes;
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
char *val1 = elePtrAt(data[i], pCol->col.bytes, index);
if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
setNull(val1, pCol->col.type, bytes);
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
int16_t type = pCol->pExpr->base.resSchema.type;
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
colDataAppendNULL(pDstCol, index);
continue;
}
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, 0);
int64_t prevTs = *(int64_t*)pKey1->pData;
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
char* data = colDataGetData(pSrcCol, pFillInfo->index);
point1 = (SPoint){.key = prevTs, .val = pKey->pData};
point2 = (SPoint){.key = ts, .val = data};
int64_t out = 0;
point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
colDataAppend(pDstCol, index, (const char*)&out, false);
}
} else {
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
}
} else { // fill the default value */
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
setNullRow(pBlock, pFillInfo->numOfCols, index);
} else { // fill with user specified value for each column
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) {
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
continue;
}
char* val1 = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(val1, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
}
}
}
setTagsValue(pFillInfo, data, index);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit,
pFillInfo->interval.precision);
// setTagsValue(pFillInfo, data, index);
SInterval* pInterval = &pFillInfo->interval;
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
pFillInfo->numOfCurrent++;
}
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) {
if (*next != NULL) {
void doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey) {
if (pKey->isNull) {
colDataAppendNULL(pDstCol, rowIndex);
} else {
colDataAppend(pDstCol, rowIndex, pKey->pData, false);
}
}
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
if (taosArrayGetSize(pFillInfo->next) > 0) {
return;
}
*next = taosMemoryCalloc(1, pFillInfo->rowSize);
for (int i = 1; i < pFillInfo->numOfCols; i++) {
for (int i = 0; i < pFillInfo->numOfCols; i++) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
setNull(*next + pCol->offset, pCol->col.type, pCol->col.bytes);
SGroupKeys key = {0};
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
key.pData = taosMemoryMalloc(pSchema->bytes);
key.isNull = true;
key.bytes = pSchema->bytes;
key.type = pSchema->type;
taosArrayPush(pFillInfo->next, &key);
key.pData = taosMemoryMalloc(pSchema->bytes);
taosArrayPush(pFillInfo->prev, &key);
}
}
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* buf) {
int32_t rowIndex = pFillInfo->index;
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
memcpy(buf + pCol->offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
int32_t srcSlotId = GET_SRC_SLOT_ID(&pFillInfo->pFillCol[i]);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
}
}
static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputRows) {
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
pFillInfo->numOfCurrent = 0;
char** srcData = pFillInfo->pData;
char** prev = &pFillInfo->prevValues;
char** next = &pFillInfo->nextValues;
// todo make sure the first column is always the primary timestamp column?
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
if (FILL_IS_ASC_FILL(pFillInfo)) {
assert(pFillInfo->currentKey >= pFillInfo->start);
} else {
assert(pFillInfo->currentKey <= pFillInfo->start);
}
#if 0
ASSERT(ascFill && (pFillInfo->currentKey >= pFillInfo->start) || (!ascFill && (pFillInfo->currentKey <= pFillInfo->start)));
#endif
while (pFillInfo->numOfCurrent < outputRows) {
int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->index];
int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
// set the next value for interpolation
if ((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) {
initBeforeAfterDataBuf(pFillInfo, next);
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) {
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
}
if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) &&
pFillInfo->numOfCurrent < outputRows) {
// fill the gap between two actual input rows
while (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) &&
pFillInfo->numOfCurrent < outputRows) {
doFillOneRowResult(pFillInfo, data, srcData, ts, false);
if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) {
// fill the gap between two input rows
while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) {
doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false);
}
// output buffer is full, abort
......@@ -208,61 +249,66 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
}
} else {
assert(pFillInfo->currentKey == ts);
initBeforeAfterDataBuf(pFillInfo, prev);
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
initBeforeAfterDataBuf(pFillInfo, next);
++pFillInfo->index;
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
--pFillInfo->index;
}
// assign rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) {
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
continue;
}
char* output = elePtrAt(data[i], pCol->col.bytes, pFillInfo->numOfCurrent);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index);
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) ||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) {
assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->offset, src, pCol->col.bytes);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId);
SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
char* src = colDataGetData(pSrc, pFillInfo->index);
if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else { // i > 0 and data is null , do interpolation
if (pFillInfo->type == TSDB_FILL_PREV) {
assignVal(output, *prev + pCol->offset, pCol->col.bytes, pCol->col.type);
SGroupKeys *pKey = taosArrayGet(pFillInfo->prev, i);
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->offset, src, pCol->col.bytes);
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else if (pFillInfo->type == TSDB_FILL_NULL) {
colDataAppendNULL(pDst, pFillInfo->numOfCurrent);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
if (*next) {
assignVal(output, *next + pCol->offset, pCol->col.bytes, pCol->col.type);
} else {
setNull(output, pCol->col.type, pCol->col.bytes);
}
SGroupKeys *pKey = taosArrayGet(pFillInfo->next, i);
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
} else {
assignVal(output, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
colDataAppend(pDst, pFillInfo->numOfCurrent, (char*)&pVar->i, false);
}
}
}
// set the tag value for final result
setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
// setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
SInterval *pInterval = &pFillInfo->interval;
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
pFillInfo->index += 1;
pFillInfo->numOfCurrent += 1;
}
if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
/* the raw data block is exhausted, next value does not exists */
if (pFillInfo->index >= pFillInfo->numOfRows) {
taosMemoryFreeClear(*next);
}
// if (pFillInfo->index >= pFillInfo->numOfRows) {
// taosMemoryFreeClear(*next);
// }
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return pFillInfo->numOfCurrent;
}
......@@ -271,14 +317,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
return pFillInfo->numOfCurrent;
}
static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t resultCapacity) {
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull) {
SGroupKeys *pKey = taosArrayGet(rowBuf, columnIndex);
if (isNull) {
pKey->isNull = true;
} else {
memcpy(pKey->pData, src, pKey->bytes);
pKey->isNull = false;
}
}
static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
/*
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
*/
pFillInfo->numOfCurrent = 0;
while (pFillInfo->numOfCurrent < resultCapacity) {
doFillOneRowResult(pFillInfo, output, pFillInfo->pData, pFillInfo->start, true);
doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
}
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
......@@ -295,15 +351,15 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
pFillInfo->pData[i] = NULL;
SResSchema* pSchema = &pColInfo->pExpr->base.resSchema;
if (TSDB_COL_IS_TAG(pColInfo->flag) || pColInfo->col.type == TSDB_DATA_TYPE_BINARY) {
if (TSDB_COL_IS_TAG(pColInfo->flag) || pSchema->type == TSDB_DATA_TYPE_BINARY) {
numOfTags += 1;
bool exists = false;
int32_t index = -1;
for (int32_t j = 0; j < k; ++j) {
if (pFillInfo->pTags[j].col.colId == pColInfo->col.slotId) {
if (pFillInfo->pTags[j].col.colId == pSchema->slotId) {
exists = true;
index = j;
break;
......@@ -311,12 +367,12 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
}
if (!exists) {
SSchema* pSchema = &pFillInfo->pTags[k].col;
pSchema->colId = pColInfo->col.slotId;
pSchema->type = pColInfo->col.type;
pSchema->bytes = pColInfo->col.bytes;
SSchema* pSchema1 = &pFillInfo->pTags[k].col;
pSchema1->colId = pSchema->slotId;
pSchema1->type = pSchema->type;
pSchema1->bytes = pSchema->bytes;
pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pColInfo->col.bytes);
pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pSchema->bytes);
pColInfo->tagIndex = k;
k += 1;
......@@ -325,7 +381,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
}
}
rowsize += pColInfo->col.bytes;
rowsize += pSchema->bytes;
}
pFillInfo->numOfTags = numOfTags;
......@@ -355,7 +411,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
}
taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order;
switch(fillType) {
......@@ -364,6 +419,7 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break;
case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break;
case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break;
case FILL_MODE_VALUE: pFillInfo->type = TSDB_FILL_SET_VALUE; break;
default:
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
......@@ -376,7 +432,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
pFillInfo->alloc = capacity;
pFillInfo->id = id;
pFillInfo->interval = *pInterval;
pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
// if (numOfTags > 0) {
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
......@@ -385,6 +440,11 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
}
// }
pFillInfo->next = taosArrayInit(numOfCols, sizeof(SGroupKeys));
pFillInfo->prev = taosArrayInit(numOfCols, sizeof(SGroupKeys));
initBeforeAfterDataBuf(pFillInfo);
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0);
return pFillInfo;
......@@ -405,18 +465,15 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
return NULL;
}
taosMemoryFreeClear(pFillInfo->prevValues);
taosMemoryFreeClear(pFillInfo->nextValues);
taosArrayDestroy(pFillInfo->prev);
taosArrayDestroy(pFillInfo->next);
for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
}
taosMemoryFreeClear(pFillInfo->pTags);
taosMemoryFreeClear(pFillInfo->pData);
taosMemoryFreeClear(pFillInfo->pFillCol);
taosMemoryFreeClear(pFillInfo);
return NULL;
}
......@@ -436,18 +493,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
}
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
pFillInfo->pData[i] = pColData->pData;
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assert (pTag->col.colId == pCol->col.slotId);
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
}
}
pFillInfo->pSrcBlock = (SSDataBlock*) pInput;
}
bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
......@@ -465,8 +511,9 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
}
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
int64_t* tsList = (int64_t*) pFillInfo->pData[0];
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
int64_t* tsList = (int64_t*) pCol->pData;
int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
TSKEY ekey1 = ekey;
......@@ -513,7 +560,7 @@ int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint*
return TSDB_CODE_SUCCESS;
}
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity) {
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo);
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
......@@ -521,9 +568,9 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t cap
// no data existed for fill operation now, append result according to the fill strategy
if (remain == 0) {
appendFilledResult(pFillInfo, output, numOfRes);
appendFilledResult(pFillInfo, p, numOfRes);
} else {
fillResultImpl(pFillInfo, output, (int32_t) numOfRes);
fillResultImpl(pFillInfo, p, (int32_t) numOfRes);
assert(numOfRes == pFillInfo->numOfCurrent);
}
......@@ -538,28 +585,30 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) {
return pFillInfo->start;
}
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val) {
int32_t offset = 0;
struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* pValNode) {
SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
if (pFillCol == NULL) {
return NULL;
}
size_t len = (pValNode != NULL)? LIST_LENGTH(pValNode->pNodeList):0;
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
SExprInfo* pExprInfo = &pExpr[i];
pFillCol[i].pExpr = pExprInfo;
pFillCol[i].tagIndex = -2;
pFillCol[i].col = pExprInfo->base.resSchema;
pFillCol[i].offset = offset;
pFillCol[i].tagIndex = -2;
// todo refactor
if (len > 0) {
// if the user specified value is less than the column, alway use the last one as the fill value
int32_t index = (i >= len)? (len - 1):i;
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
valueNodeToVariant(pv, &pFillCol[i].fillVal);
}
if (pExprInfo->base.numOfParams > 0) {
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
}
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
// pFillCol[i].val.d = *val;
offset += pExprInfo->base.resSchema.bytes;
}
return pFillCol;
......
......@@ -78,6 +78,8 @@ typedef struct SDiffInfo {
int64_t i64;
double d64;
} prev;
int64_t prevTs;
} SDiffInfo;
typedef struct SSpreadInfo {
......@@ -1196,9 +1198,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
bool isFirstBlock = (pDiffInfo->hasPrev == false);
int32_t numOfElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
......@@ -1206,44 +1205,86 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
switch (pInputCol->info.type) {
case TSDB_DATA_TYPE_INT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
if (pCtx->order == TSDB_ORDER_ASC) {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems += 1;
}
continue;
}
numOfElems += 1;
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
}
continue;
pDiffInfo->prev.i64 = v;
pDiffInfo->hasPrev = true;
numOfElems++;
}
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
int32_t pos = startOffset + numOfElems;
// there is a row of previous data block to be handled in the first place.
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
}
pDiffInfo->hasPrev = false;
}
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
// it is not the last row of current block
if (i < pInput->numOfRows + pInput->startRowIndex - 1) {
int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1);
int32_t delta = v - next; // direct previous may be null
colDataAppendInt32(pOutput, pos, &delta);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
} else {
pDiffInfo->prev.i64 = v;
if (pTsOutput != NULL) {
pDiffInfo->prevTs = tsList[i];
}
pDiffInfo->hasPrev = true;
}
numOfElems++;
}
pDiffInfo->prev.i64 = v;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
......@@ -1378,7 +1419,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
}
// initial value is not set yet
if (!pDiffInfo->hasPrev || numOfElems <= 0) {
if (numOfElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
......@@ -1386,15 +1427,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
assert(pCtx->hasNull);
return 0;
} else {
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
// }
// }
int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
return forwardStep;
return (isFirstBlock) ? numOfElems - 1 : numOfElems;
}
}
......
......@@ -1521,7 +1521,7 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
SFillPhysiNode* pNode = (SFillPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode);
}
......
......@@ -945,7 +945,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册