未验证 提交 27c84845 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20625 from taosdata/fix/TD-23220

fix: modify stream fill linear
...@@ -92,8 +92,8 @@ typedef struct SResultRowData { ...@@ -92,8 +92,8 @@ typedef struct SResultRowData {
typedef struct SStreamFillLinearInfo { typedef struct SStreamFillLinearInfo {
TSKEY nextEnd; TSKEY nextEnd;
SArray* pDeltaVal; // double. value for Fill(linear). SArray* pEndPoints;
SArray* pNextDeltaVal; // double. value for Fill(linear). SArray* pNextEndPoints;
int64_t winIndex; int64_t winIndex;
bool hasNext; bool hasNext;
} SStreamFillLinearInfo; } SStreamFillLinearInfo;
......
...@@ -447,9 +447,14 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) { ...@@ -447,9 +447,14 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
return NULL; return NULL;
} }
void destroySPoint(void* ptr) {
SPoint* point = (SPoint*) ptr;
taosMemoryFreeClear(point->val);
}
void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) { void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
taosArrayDestroy(pFillLinear->pDeltaVal); taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
taosArrayDestroy(pFillLinear->pNextDeltaVal); taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
taosMemoryFree(pFillLinear); taosMemoryFree(pFillLinear);
return NULL; return NULL;
} }
...@@ -611,19 +616,15 @@ static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pR ...@@ -611,19 +616,15 @@ static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pR
} }
} }
static void calcRowDeltaData(SResultRowData* pStartRow, SResultRowData* pEndRow, SArray* pDelta, SFillColInfo* pFillCol, static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol,
int32_t numOfCol, int32_t winCount) { int32_t numOfCol) {
for (int32_t i = 0; i < numOfCol; i++) { for (int32_t i = 0; i < numOfCol; i++) {
if (!pFillCol[i].notFillCol) { if (!pFillCol[i].notFillCol) {
int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i); int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i);
SResultCellData* pSCell = getResultCell(pStartRow, slotId);
double start = 0.0;
GET_TYPED_DATA(start, double, pSCell->type, pSCell->pData);
SResultCellData* pECell = getResultCell(pEndRow, slotId); SResultCellData* pECell = getResultCell(pEndRow, slotId);
double end = 0.0; SPoint* pPoint = taosArrayGet(pEndPoins, slotId);
GET_TYPED_DATA(end, double, pECell->type, pECell->pData); pPoint->key = pEndRow->key;
double delta = (end - start) / winCount; memcpy(pPoint->val, pECell->pData, pECell->bytes);
taosArraySet(pDelta, slotId, &delta);
} }
} }
} }
...@@ -674,10 +675,8 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS ...@@ -674,10 +675,8 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo); setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(pFillSup->prev.key, pFillSup->next.key, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
} break; } break;
...@@ -780,25 +779,19 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS ...@@ -780,25 +779,19 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_MID; pFillInfo->pos = FILL_POS_MID;
pFillInfo->pLinearInfo->nextEnd = nextWKey; pFillInfo->pLinearInfo->nextEnd = nextWKey;
int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pNextDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pLinearInfo->hasNext = true; pFillInfo->pLinearInfo->hasNext = true;
} else if (hasPrevWindow(pFillSup)) { } else if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} else { } else {
...@@ -806,10 +799,8 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS ...@@ -806,10 +799,8 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pResRow = &pFillSup->cur;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} }
...@@ -906,13 +897,18 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* ...@@ -906,13 +897,18 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
colDataSetNULL(pColData, index); colDataSetNULL(pColData, index);
continue; continue;
} }
double* pDelta = taosArrayGet(pFillInfo->pLinearInfo->pDeltaVal, slotId); SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
double vCell = 0; double vCell = 0;
GET_TYPED_DATA(vCell, double, pCell->type, pCell->pData); SPoint start = {0};
vCell += (*pDelta) * pFillInfo->pLinearInfo->winIndex; start.key = pFillInfo->pResRow->key;
int64_t result = 0; start.val = pCell->pData;
SET_TYPED_DATA(&result, pCell->type, vCell);
colDataSetVal(pColData, index, (const char*)&result, false); SPoint cur = {0};
cur.key = pFillInfo->current;
cur.val = taosMemoryCalloc(1, pCell->bytes);
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
colDataSetVal(pColData, index, (const char*)cur.val, false);
destroySPoint(&cur);
} }
} }
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
...@@ -953,8 +949,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* ...@@ -953,8 +949,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) { if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
taosArrayClear(pFillInfo->pLinearInfo->pDeltaVal); taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
taosArrayAddAll(pFillInfo->pLinearInfo->pDeltaVal, pFillInfo->pLinearInfo->pNextDeltaVal);
pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pResRow = &pFillSup->cur;
setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo); setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
doStreamFillLinear(pFillSup, pFillInfo, pRes); doStreamFillLinear(pFillSup, pFillInfo, pRes);
...@@ -1359,15 +1354,19 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* ...@@ -1359,15 +1354,19 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo)); pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
pFillInfo->pLinearInfo->pDeltaVal = NULL; pFillInfo->pLinearInfo->pEndPoints = NULL;
pFillInfo->pLinearInfo->pNextDeltaVal = NULL; pFillInfo->pLinearInfo->pNextEndPoints = NULL;
if (pFillSup->type == TSDB_FILL_LINEAR) { if (pFillSup->type == TSDB_FILL_LINEAR) {
pFillInfo->pLinearInfo->pDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double)); pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
pFillInfo->pLinearInfo->pNextDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double)); pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) { for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
double value = 0.0; SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
taosArrayPush(pFillInfo->pLinearInfo->pDeltaVal, &value); SPoint value = {0};
taosArrayPush(pFillInfo->pLinearInfo->pNextDeltaVal, &value); value.val = taosMemoryCalloc(1, pColData->info.bytes);
taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
value.val = taosMemoryCalloc(1, pColData->info.bytes);
taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
} }
} }
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册