提交 1d14c057 编写于 作者: H hjxilinx

[TD-28] refactor some codes

上级 cb00cc8c
...@@ -592,7 +592,7 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) { ...@@ -592,7 +592,7 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) {
static void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t StartQueryTimestamp, void *inputData, static void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t StartQueryTimestamp, void *inputData,
char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull,
void *param, SBlockInfo *pBlockInfo, int32_t index); void *param);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo); void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
...@@ -1407,7 +1407,7 @@ static char *doGetDataBlocks(SQuery *pQuery, SData **data, int32_t colIdx) { ...@@ -1407,7 +1407,7 @@ static char *doGetDataBlocks(SQuery *pQuery, SData **data, int32_t colIdx) {
return pData; return pData;
} }
static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, int32_t* index) { static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
...@@ -1446,7 +1446,6 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa ...@@ -1446,7 +1446,6 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa
* So, the validation of required column in cache with the corresponding meter schema is reinforced. * So, the validation of required column in cache with the corresponding meter schema is reinforced.
*/ */
dataBlock = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pCol->colIdxInBuf); dataBlock = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pCol->colIdxInBuf);
*index = pCol->colIdxInBuf;
} }
} }
...@@ -1994,8 +1993,8 @@ static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* ...@@ -1994,8 +1993,8 @@ static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo*
* 2. time window start exactly from a timestamp with data * 2. time window start exactly from a timestamp with data
*/ */
if (skey == win->skey || win->skey < pWindowResInfo->startTime || if (skey == win->skey || win->skey < pWindowResInfo->startTime ||
(win->skey < pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || (win->skey <= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) ||
(win->skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { (win->skey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
pCtx->prev.key = -1; pCtx->prev.key = -1;
return; return;
} }
...@@ -2089,6 +2088,40 @@ static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, S ...@@ -2089,6 +2088,40 @@ static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, S
} }
} }
static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo, SBlockInfo* pBlockInfo,
TSKEY ts, int32_t offset, STimeWindow* win) {
// get current not closed time window
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t slot = pWindowResInfo->curIndex;
if (slot == -1) {
return;
}
while (slot < pWindowResInfo->size) {
STimeWindow w = getWindowResult(pWindowResInfo, slot)->window;
if (w.skey == win->skey) {
assert(w.ekey == win->ekey);
break;
}
// if current active window locates before current data block, do interpolate the result and close it
assert((w.skey < win->skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) ||
(w.skey > win->skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery)));
int32_t forwardStep = 0;
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep);
closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo));
// try next time window
slot += 1;
}
}
/** /**
* *
* @param pRuntimeEnv * @param pRuntimeEnv
...@@ -2116,9 +2149,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -2116,9 +2149,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
SField dummyField = {0}; SField dummyField = {0};
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep);
int32_t index = 0;
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep, &index);
SField *tpField = NULL; SField *tpField = NULL;
...@@ -2135,9 +2166,10 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -2135,9 +2166,10 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
} }
setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField,
hasNull, &sasArray[k], pBlockInfo, index); hasNull, &sasArray[k]);
} }
// save the last row in current data block
for(int32_t i = 0; i < pQuery->numOfCols; ++i) { for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo* pColInfo = &pQuery->colList[i].data; SColumnInfo* pColInfo = &pQuery->colList[i].data;
int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0; int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0;
...@@ -2151,33 +2183,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -2151,33 +2183,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
TSKEY ts = primaryKeyCol[offset]; TSKEY ts = primaryKeyCol[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
doInterpolatePrevTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, ts, offset, &win);
// get current not closed time window
int32_t slot = pWindowResInfo->curIndex;
if (slot != -1) {
while (slot < pWindowResInfo->size) {
STimeWindow w = getWindowResult(pWindowResInfo, slot)->window;
// if current active window locates before current data block, do interpolate the result and close it
if (w.skey != win.skey || w.ekey != win.ekey) {
assert((w.skey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) ||
(w.skey > win.skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery)));
forwardStep = 0;
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep);
closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo));
} else {
break;
}
// try next time window
slot += 1;
}
}
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
return 0; return 0;
...@@ -2614,14 +2620,12 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2614,14 +2620,12 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
int32_t index = -1;
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep, &index); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep);
TSKEY ts = pQuery->skey; TSKEY ts = pQuery->skey;
// pRuntimeEnv->intervalWindow.ekey;
setExecParams(pRuntimeEnv, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, setExecParams(pRuntimeEnv, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull,
&sasArray[k], pBlockInfo, 0); &sasArray[k]);
} }
// set the input column data // set the input column data
...@@ -2647,6 +2651,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2647,6 +2651,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
int32_t j = 0; int32_t j = 0;
TSKEY lastKey = -1; TSKEY lastKey = -1;
int32_t lastIndex = -1;
bool firstAccessedPoint = true;
for (j = 0; j < (*forwardStep); ++j) { for (j = 0; j < (*forwardStep); ++j) {
int32_t offset = GET_COL_DATA_POS(pQuery, j, step); int32_t offset = GET_COL_DATA_POS(pQuery, j, step);
...@@ -2669,8 +2675,31 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2669,8 +2675,31 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
// interval window query // interval window query
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
// decide the time window according to the primary timestamp // decide the time window according to the primary timestamp
int64_t ts = primaryKeyCol[offset]; TSKEY ts = primaryKeyCol[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
// if (firstAccessedPoint) {
// doInterpolatePrevTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, ts, offset, &win);
// firstAccessedPoint = false;
// } else {
// int32_t index = pWindowResInfo->curIndex;
// STimeWindow w = getWindowResult(pWindowResInfo, index)->window;
//
// if (w.skey == win.skey) { // do nothing
// assert(w.ekey == win.ekey);
// } else {
// assert((w.skey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) ||
// (w.skey > win.skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery)));
//
// // set the endkey interpolation for the previous
// for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
//
// interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
// }
//
// }
// }
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win); int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
...@@ -2684,6 +2713,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2684,6 +2713,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset); doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset);
lastKey = ts; lastKey = ts;
lastIndex = j;
STimeWindow nextWin = win; STimeWindow nextWin = win;
int32_t index = pWindowResInfo->curIndex; int32_t index = pWindowResInfo->curIndex;
int32_t sid = pRuntimeEnv->pMeterObj->sid; int32_t sid = pRuntimeEnv->pMeterObj->sid;
...@@ -2728,6 +2759,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2728,6 +2759,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].next.key = -1;
pCtx[k].prev.key = -1;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset); aAggs[functionId].xFunctionF(&pCtx[k], offset);
} }
...@@ -2753,6 +2787,15 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2753,6 +2787,15 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
} }
} }
// save the last accessed row of current data block for interpolation
int32_t index = GET_COL_DATA_POS(pQuery, lastIndex, step);
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo* pColInfo = &pQuery->colList[i].data;
int32_t s = pColInfo->bytes * index;
memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes);
}
free(sasArray); free(sasArray);
/* /*
...@@ -2975,7 +3018,7 @@ static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, in ...@@ -2975,7 +3018,7 @@ static void getOneRowFromDataBlock(SQueryRuntimeEnv *pRuntimeEnv, char **dst, in
void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData, void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t startQueryTimestamp, void *inputData,
char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull,
void *param, SBlockInfo *pBlockInfo, int32_t index) { void *param) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1); int32_t startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (size - 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册