提交 a9f0a30d 编写于 作者: H Haojun Liao

[TD-1870]<enhance>: optimize the memory consumption.

上级 2cd97574
...@@ -64,9 +64,8 @@ typedef struct SLocalReducer { ...@@ -64,9 +64,8 @@ typedef struct SLocalReducer {
SColumnModel * resColModel; SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo char* pFinalRes; // result data after interpo
tFilePage * discardData; tFilePage* discardData;
SResultRowCellInfo * pResInfo;
bool discard; bool discard;
int32_t offset; // limit offset value int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable bool orderPrjOnSTable; // projection query on stable
......
...@@ -99,7 +99,7 @@ typedef struct SSumInfo { ...@@ -99,7 +99,7 @@ typedef struct SSumInfo {
// the attribute of hasResult is not needed since the num attribute would server as this purpose // the attribute of hasResult is not needed since the num attribute would server as this purpose
typedef struct SAvgInfo { typedef struct SAvgInfo {
double sum; double sum;
int64_t num; // num servers as the hasResult attribute in other struct int64_t num;
} SAvgInfo; } SAvgInfo;
typedef struct SStddevInfo { typedef struct SStddevInfo {
...@@ -167,7 +167,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -167,7 +167,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) { functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) {
*type = (int16_t)dataType; *type = (int16_t)dataType;
*bytes = (int16_t)dataBytes; *bytes = (int16_t)dataBytes;
*interBytes = 0;//*bytes;
if (functionId == TSDB_FUNC_INTERP) {
*interBytes = sizeof(SInterpInfoDetail);
} else {
*interBytes = 0;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -175,21 +181,21 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -175,21 +181,21 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); *bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
*interBytes = 0;//*bytes; *interBytes = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (functionId == TSDB_FUNC_COUNT) { if (functionId == TSDB_FUNC_COUNT) {
*type = TSDB_DATA_TYPE_BIGINT; *type = TSDB_DATA_TYPE_BIGINT;
*bytes = sizeof(int64_t); *bytes = sizeof(int64_t);
*interBytes = 0;//*bytes; *interBytes = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (functionId == TSDB_FUNC_ARITHM) { if (functionId == TSDB_FUNC_ARITHM) {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double); *bytes = sizeof(double);
*interBytes = 0;//*bytes; *interBytes = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -334,11 +340,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -334,11 +340,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf) {
// assert(GET_ROWCELL_INTERBUF(pResInfo) == NULL);
// GET_ROWCELL_INTERBUF(pResInfo) = buf;
//}
// set the query flag to denote that query is completed // set the query flag to denote that query is completed
static void no_next_step(SQLFunctionCtx *pCtx) { static void no_next_step(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
...@@ -3998,7 +3999,6 @@ static void interp_function(SQLFunctionCtx *pCtx) { ...@@ -3998,7 +3999,6 @@ static void interp_function(SQLFunctionCtx *pCtx) {
} }
} }
} }
} }
SET_VAL(pCtx, pCtx->size, 1); SET_VAL(pCtx, pCtx->size, 1);
......
...@@ -99,11 +99,8 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc ...@@ -99,11 +99,8 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->param[1].i64Key = pQueryInfo->order.orderColId; pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
} }
// SResultRowCellInfo *pResInfo = &pReducer->pResInfo[i]; pCtx->interBufBytes = pExpr->interBytes;
pCtx->interBufBytes = pExpr->interBytes; pCtx->resultInfo = calloc(1, pCtx->interBufBytes + sizeof(SResultRowCellInfo));
// pResInfo->interResultBuf = calloc(1, (size_t) pCtx->interBufBytes);
pCtx->resultInfo = &pReducer->pResInfo[i];
pCtx->stableQuery = true; pCtx->stableQuery = true;
} }
...@@ -345,7 +342,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -345,7 +342,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->num = 0; pReducer->pTempBuffer->num = 0;
pReducer->pResInfo = calloc(numOfCols, sizeof(SResultRowCellInfo));
tscCreateResPointerInfo(pRes, pQueryInfo); tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pReducer, pDesc); tscInitSqlContext(pCmd, pReducer, pDesc);
...@@ -496,6 +492,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -496,6 +492,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i]; SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
tVariantDestroy(&pCtx->tag); tVariantDestroy(&pCtx->tag);
taosTFree(pCtx->resultInfo);
if (pCtx->tagInfo.pTagCtxList != NULL) { if (pCtx->tagInfo.pTagCtxList != NULL) {
taosTFree(pCtx->tagInfo.pTagCtxList); taosTFree(pCtx->tagInfo.pTagCtxList);
} }
...@@ -509,15 +507,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -509,15 +507,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
taosTFree(pLocalReducer->pTempBuffer); taosTFree(pLocalReducer->pTempBuffer);
taosTFree(pLocalReducer->pResultBuf); taosTFree(pLocalReducer->pResultBuf);
if (pLocalReducer->pResInfo != NULL) {
size_t num = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < num; ++i) {
// taosTFree(pLocalReducer->pResInfo[i].interResultBuf);
}
taosTFree(pLocalReducer->pResInfo);
}
if (pLocalReducer->pLoserTree) { if (pLocalReducer->pLoserTree) {
taosTFree(pLocalReducer->pLoserTree->param); taosTFree(pLocalReducer->pLoserTree->param);
taosTFree(pLocalReducer->pLoserTree); taosTFree(pLocalReducer->pLoserTree);
......
...@@ -168,7 +168,7 @@ typedef struct SQuery { ...@@ -168,7 +168,7 @@ typedef struct SQuery {
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
jmp_buf env; jmp_buf env;
SResultRowCellInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultRow* pResultRow; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
int32_t numOfRowsPerPage; int32_t numOfRowsPerPage;
...@@ -190,7 +190,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -190,7 +190,7 @@ typedef struct SQueryRuntimeEnv {
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool SResultRowPool* pool; // window result object pool
int32_t* rowCellInfoOffset;// offset value for each row result cell info int32_t* rowCellInfoOffset;// offset value for each row result cell info
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
......
...@@ -42,7 +42,7 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); ...@@ -42,7 +42,7 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order); void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
static FORCE_INLINE SResultRow *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return pWindowResInfo->pResult[slot]; return pWindowResInfo->pResult[slot];
} }
...@@ -52,7 +52,7 @@ static FORCE_INLINE SResultRow *getWindowResult(SWindowResInfo *pWindowResInfo, ...@@ -52,7 +52,7 @@ static FORCE_INLINE SResultRow *getWindowResult(SWindowResInfo *pWindowResInfo,
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t createQueryResultInfo(SQuery *pQuery, SResultRow *pResultRow); int32_t initResultRow(SResultRow *pResultRow);
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult,
tFilePage* page) { tFilePage* page) {
......
...@@ -178,7 +178,7 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { ...@@ -178,7 +178,7 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
// todo move to utility // todo move to utility
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group); static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); static void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow); static void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
...@@ -449,10 +449,9 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis ...@@ -449,10 +449,9 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) { int16_t bytes, bool masterscan, uint64_t uid) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
int32_t *p1 = (int32_t *) taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); int32_t *p1 =
(int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (p1 != NULL) { if (p1 != NULL) {
pWindowResInfo->curIndex = *p1; pWindowResInfo->curIndex = *p1;
} else { } else {
...@@ -470,9 +469,6 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow ...@@ -470,9 +469,6 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow
} }
char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES));
// pRuntimeEnv->summary.winInfoSize += (newCapacity - pWindowResInfo->capacity) * sizeof(SResultRow);
// pRuntimeEnv->summary.numOfTimeWindows += (newCapacity - pWindowResInfo->capacity);
if (t == NULL) { if (t == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -484,17 +480,18 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow ...@@ -484,17 +480,18 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow
pWindowResInfo->capacity = (int32_t)newCapacity; pWindowResInfo->capacity = (int32_t)newCapacity;
} }
// pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultRowCellInfo) + pRuntimeEnv->interBufSize) * inc;
SResultRow* pResult = getNewResultRow(pRuntimeEnv->pool); SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool);
pWindowResInfo->pResult[pWindowResInfo->size] = pResult; pWindowResInfo->pResult[pWindowResInfo->size] = pResult;
int32_t ret = createQueryResultInfo(pQuery, pResult); int32_t ret = initResultRow(pResult);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
// add a new result set for a new group // add a new result set for a new group
pWindowResInfo->curIndex = pWindowResInfo->size++; pWindowResInfo->curIndex = pWindowResInfo->size++;
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes),
(char *)&pWindowResInfo->curIndex, sizeof(int32_t));
} }
// too many time window in query // too many time window in query
...@@ -502,7 +499,7 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow ...@@ -502,7 +499,7 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex); return getResultRow(pWindowResInfo, pWindowResInfo->curIndex);
} }
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp
...@@ -518,7 +515,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t ...@@ -518,7 +515,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
} }
} else { } else {
int32_t slot = curTimeWindowIndex(pWindowResInfo); int32_t slot = curTimeWindowIndex(pWindowResInfo);
SResultRow* pWindowRes = getWindowResult(pWindowResInfo, slot); SResultRow* pWindowRes = getResultRow(pWindowResInfo, slot);
w = pWindowRes->win; w = pWindowRes->win;
} }
...@@ -1091,7 +1088,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1091,7 +1088,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
taosTFree(sasArray); taosTFree(sasArray);
} }
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
if (isNull(pData, type)) { // ignore the null value if (isNull(pData, type)) { // ignore the null value
return -1; return -1;
} }
...@@ -1113,7 +1110,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat ...@@ -1113,7 +1110,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
uint64_t uid = 0; // uid is always set to be 0. uint64_t uid = groupIndex; // uid is always set to be 0.
SResultRow *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid); SResultRow *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid);
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return -1; return -1;
...@@ -1143,7 +1140,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat ...@@ -1143,7 +1140,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
} }
} }
setWindowResOutputBuf(pRuntimeEnv, pWindowRes); setResultOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1374,7 +1371,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1374,7 +1371,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if (groupbyColumnValue) { if (groupbyColumnValue) {
char *val = groupbyColumnData + bytes * offset; char *val = groupbyColumnData + bytes * offset;
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes); int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue; continue;
} }
...@@ -1607,29 +1604,15 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1607,29 +1604,15 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void setResultRowCellInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pRow, char* buf) {
// SQuery* pQuery = pRuntimeEnv->pQuery;
//
// char* p = buf;
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t size = pQuery->pSelectExpr[i].interBytes;
// SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, i);
// setResultInfoBuf(pInfo, p);
// p += size;
// }
}
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order) { static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order) {
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
size_t size = pRuntimeEnv->interBufSize + pQuery->numOfOutput * sizeof(SResultRowCellInfo);
pRuntimeEnv->resultInfo = calloc(1, size);
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
pRuntimeEnv->pResultRow = getNewResultRow(pRuntimeEnv->pool);//calloc(1, sizeof(SResultRow));
if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) { if (pRuntimeEnv->pResultRow == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) {
goto _clean; goto _clean;
} }
...@@ -1666,7 +1649,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1666,7 +1649,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pCtx->inputType = pQuery->colList[index].type; pCtx->inputType = pQuery->colList[index].type;
} }
assert(isValidDataType(pCtx->inputType)); assert(isValidDataType(pCtx->inputType));
pCtx->ptsOutputBuf = NULL; pCtx->ptsOutputBuf = NULL;
...@@ -1711,13 +1693,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1711,13 +1693,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
} }
// char* buf = (char*) pRuntimeEnv->resultInfo + sizeof(SResultRowCellInfo) * pQuery->numOfOutput;
// set the intermediate result output buffer
// setResultRowCellInfo(pRuntimeEnv, pRuntimeEnv->resultInfo, NULL);
// if it is group by normal column, do not set output buffer, the output buffer is pResult // if it is group by normal column, do not set output buffer, the output buffer is pResult
if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery) { // fixed output query/multi-output query for normal table
if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
} }
...@@ -1729,7 +1707,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1729,7 +1707,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_clean: _clean:
taosTFree(pRuntimeEnv->resultInfo);
taosTFree(pRuntimeEnv->pCtx); taosTFree(pRuntimeEnv->pCtx);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
...@@ -1758,7 +1735,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1758,7 +1735,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosTFree(pCtx->tagInfo.pTagCtxList); taosTFree(pCtx->tagInfo.pTagCtxList);
} }
taosTFree(pRuntimeEnv->resultInfo);
taosTFree(pRuntimeEnv->pCtx); taosTFree(pRuntimeEnv->pCtx);
} }
...@@ -2833,14 +2809,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) ...@@ -2833,14 +2809,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
} }
SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
SResultRow * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos); SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId); tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId);
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1); char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1);
TSKEY leftTimestamp = GET_INT64_VAL(b1); TSKEY leftTimestamp = GET_INT64_VAL(b1);
SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
SResultRow * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos); SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId); tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId);
char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2); char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2);
...@@ -2961,7 +2937,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2961,7 +2937,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
pQuery->rec.rows += offset; pQuery->rec.rows += offset;
} }
int64_t getNumOfResultWindowRes(SQuery *pQuery, SResultRow *pResultRow) { int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId; int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
...@@ -2973,7 +2951,7 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SResultRow *pResultRow) { ...@@ -2973,7 +2951,7 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SResultRow *pResultRow) {
continue; continue;
} }
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[j]; SResultRowCellInfo *pResultInfo = getResultCell(pRuntimeEnv, pResultRow, j);
assert(pResultInfo != NULL); assert(pResultInfo != NULL);
if (pResultInfo->numOfRes > 0) { if (pResultInfo->numOfRes > 0) {
...@@ -3042,18 +3020,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -3042,18 +3020,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SLoserTreeInfo *pTree = NULL; SLoserTreeInfo *pTree = NULL;
tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
SResultRow* pRow = calloc(1, getWindowResultSize(pRuntimeEnv)); SResultRow* pRow = getNewResultRow(pRuntimeEnv->pool);
if (pRow == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pRow->pCellInfo = (SResultRowCellInfo*) ((char*) pRow + sizeof(SResultRow));
// char* buf = (char*) pRow + sizeof(SResultRowCellInfo)*;
// if (buf == NULL) {
// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
// }
setResultRowCellInfo(pRuntimeEnv, pRow, NULL);
resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow); resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow);
pQInfo->groupResInfo.groupId = getGroupResultId(pQInfo->groupIndex); pQInfo->groupResInfo.groupId = getGroupResultId(pQInfo->groupIndex);
...@@ -3069,23 +3036,20 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -3069,23 +3036,20 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
taosTFree(pTableList); taosTFree(pTableList);
taosTFree(posList); taosTFree(posList);
taosTFree(pTree); taosTFree(pTree);
// taosTFree(pResultInfo);
// taosTFree(buf);
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
int32_t pos = pTree->pNode[0].index; int32_t pos = pTree->pNode[0].index;
SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
SResultRow *pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page); char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page);
TSKEY ts = GET_INT64_VAL(b); TSKEY ts = GET_INT64_VAL(b);
assert(ts == pWindowRes->win.skey); assert(ts == pWindowRes->win.skey);
int64_t num = getNumOfResultWindowRes(pQuery, pWindowRes); int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
if (num <= 0) { if (num <= 0) {
cs.position[pos] += 1; cs.position[pos] += 1;
...@@ -3128,7 +3092,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -3128,7 +3092,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
} }
} else { } else {
// current page is not needed anymore // current page is not needed anymore
SResultRow *pNextWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); SResultRow *pNextWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
if (pNextWindowRes->pageId != currentPageId) { if (pNextWindowRes->pageId != currentPageId) {
releaseResBufPage(pRuntimeEnv->pResultBuf, page); releaseResBufPage(pRuntimeEnv->pResultBuf, page);
} }
...@@ -3145,8 +3109,6 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -3145,8 +3109,6 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
taosTFree(pTree); taosTFree(pTree);
taosTFree(pTableList); taosTFree(pTableList);
taosTFree(posList); taosTFree(posList);
// taosTFree(pResultInfo);
return -1; return -1;
} }
} }
...@@ -3251,8 +3213,8 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * ...@@ -3251,8 +3213,8 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
pTableQueryInfo->windowResInfo.curIndex = pTableQueryInfo->windowResInfo.size - 1; pTableQueryInfo->windowResInfo.curIndex = pTableQueryInfo->windowResInfo.size - 1;
} }
static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindowResInfo, int32_t order) { static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t order) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
bool closed = getTimeWindowResStatus(pWindowResInfo, i); bool closed = getTimeWindowResStatus(pWindowResInfo, i);
...@@ -3260,17 +3222,18 @@ static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindow ...@@ -3260,17 +3222,18 @@ static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindow
continue; continue;
} }
SResultRow *buf = getWindowResult(pWindowResInfo, i); SResultRow *pRow = getResultRow(pWindowResInfo, i);
// open/close the specified query for each group result // open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pSelectExpr[j].base.functionId; int32_t functId = pQuery->pSelectExpr[j].base.functionId;
SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, j);
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
buf->pCellInfo[j].complete = false; pInfo->complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
buf->pCellInfo[j].complete = true; pInfo->complete = true;
} }
} }
} }
...@@ -3284,7 +3247,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { ...@@ -3284,7 +3247,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
// group by normal columns and interval query on normal table // group by normal columns and interval query on normal table
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order); disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order);
} else { // for simple result of table query, } else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
int32_t functId = pQuery->pSelectExpr[j].base.functionId; int32_t functId = pQuery->pSelectExpr[j].base.functionId;
...@@ -3334,22 +3297,16 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3334,22 +3297,16 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
} }
} }
int32_t createQueryResultInfo(SQuery *pQuery, SResultRow *pResultRow) { int32_t initResultRow(SResultRow *pResultRow) {
// int32_t numOfCols = pQuery->numOfOutput;
pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow)); pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow));
pResultRow->pageId = -1; pResultRow->pageId = -1;
pResultRow->rowId = -1; pResultRow->rowId = -1;
// char* buf = (char*) pResultRow->pCellInfo + numOfCols * sizeof(SResultRowCellInfo);
// set the intermediate result output buffer
// setResultRowCellInfo(pRunimeEnv, pResultRow, buf);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SResultRow* pRow = pRuntimeEnv->pResultRow;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
...@@ -3359,8 +3316,9 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3359,8 +3316,9 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
* set the output buffer information and intermediate buffer * set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc. * not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
*/ */
RESET_RESULT_INFO(&pRuntimeEnv->resultInfo[i]); SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i);
pCtx->resultInfo = &pRuntimeEnv->resultInfo[i]; RESET_RESULT_INFO(pCellInfo);
pCtx->resultInfo = pCellInfo;
// set the timestamp output buffer for top/bottom/diff query // set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pQuery->pSelectExpr[i].base.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
...@@ -3478,12 +3436,12 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3478,12 +3436,12 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = getWindowResult(pWindowResInfo, i); SResultRow *pResult = getResultRow(pWindowResInfo, i);
if (!pResult->closed) { if (!pResult->closed) {
continue; continue;
} }
setWindowResOutputBuf(pRuntimeEnv, pResult); setResultOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].base.functionId; int16_t functId = pQuery->pSelectExpr[j].base.functionId;
...@@ -3709,7 +3667,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3709,7 +3667,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
continue; continue;
} }
setWindowResOutputBuf(pRuntimeEnv, buf); setResultOutputBuf(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
...@@ -3816,11 +3774,11 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { ...@@ -3816,11 +3774,11 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
// record the current active group id // record the current active group id
pRuntimeEnv->prevGroupId = groupIndex; pRuntimeEnv->prevGroupId = groupIndex;
setWindowResOutputBuf(pRuntimeEnv, pWindowRes); setResultOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
} }
void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
...@@ -3836,10 +3794,10 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { ...@@ -3836,10 +3794,10 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) {
} }
/* /*
* set the output buffer information and intermediate buffer * set the output buffer information and intermediate buffer,
* not all queries require the interResultBuf, such as COUNT * not all queries require the interResultBuf, such as COUNT
*/ */
pCtx->resultInfo = &pResult->pCellInfo[i]; pCtx->resultInfo = getResultCell(pRuntimeEnv, pResult, i);
} }
} }
...@@ -3852,7 +3810,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe ...@@ -3852,7 +3810,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->resultInfo = &pResult->pCellInfo[i]; pCtx->resultInfo = getResultCell(pRuntimeEnv, pResult, i);
if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) { if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) {
continue; continue;
} }
...@@ -4100,7 +4058,8 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4100,7 +4058,8 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) {
continue; continue;
} }
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->pCellInfo[j].numOfRes)); SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j);
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes));
} }
} }
} }
...@@ -4891,7 +4850,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4891,7 +4850,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) { if (isPointInterpoQuery(pQuery)) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
...@@ -4921,11 +4880,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4921,11 +4880,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pRuntimeEnv->pQueryHandle = NULL; pRuntimeEnv->pQueryHandle = NULL;
} }
if (isFirstLastRowQuery(pQuery)) { pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
assert(0); // last_row query switch to other routine to handle
} else {
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
}
taosArrayDestroy(tx); taosArrayDestroy(tx);
taosArrayDestroy(g1); taosArrayDestroy(g1);
...@@ -4939,10 +4894,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4939,10 +4894,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
assert(taosArrayGetSize(s) >= 1); assert(taosArrayGetSize(s) >= 1);
setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb);
if (isFirstLastRowQuery(pQuery)) {
assert(taosArrayGetSize(s) == 1);
}
taosArrayDestroy(s); taosArrayDestroy(s);
// here we simply set the first table as current table // here we simply set the first table as current table
...@@ -5025,7 +4976,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5025,7 +4976,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
SResultRow *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->pCellInfo[j].numOfRes)); SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j);
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes));
} }
} }
...@@ -5296,7 +5248,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -5296,7 +5248,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
//TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead //TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead
// finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -6287,8 +6238,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6287,8 +6238,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
calResultBufSize(pQuery); calResultBufSize(pQuery);
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
assert(pExprs[col].interBytes >= pExprs[col].bytes || pExprs[col].interBytes == 0);
// allocate additional memory for interResults that are usually larger then final results // allocate additional memory for interResults that are usually larger then final results
size_t size = (size_t)((pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(tFilePage)); size_t size = (size_t)((pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(tFilePage));
pQuery->sdata[col] = (tFilePage *)calloc(1, size); pQuery->sdata[col] = (tFilePage *)calloc(1, size);
......
...@@ -87,7 +87,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -87,7 +87,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
assert(num >= 0 && num <= numOfClosed); assert(num >= 0 && num <= numOfClosed);
int16_t type = pWindowResInfo->type; int16_t type = pWindowResInfo->type;
uint64_t uid = 0; // uid is always set to be 0. STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable); // uid is always set to be 0.
char *key = NULL; char *key = NULL;
int16_t bytes = -1; int16_t bytes = -1;
...@@ -104,7 +104,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -104,7 +104,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
bytes = tDataTypeDesc[pWindowResInfo->type].nSize; bytes = tDataTypeDesc[pWindowResInfo->type].nSize;
} }
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, id->uid);
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
} else { } else {
break; break;
...@@ -137,14 +137,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -137,14 +137,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
bytes = tDataTypeDesc[pWindowResInfo->type].nSize; bytes = tDataTypeDesc[pWindowResInfo->type].nSize;
} }
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, id->uid);
int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
assert(p != NULL); assert(p != NULL);
int32_t v = (*p - num); int32_t v = (*p - num);
assert(v >= 0 && v <= pWindowResInfo->size); assert(v >= 0 && v <= pWindowResInfo->size);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, id->uid);
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t)); taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t));
} }
...@@ -217,11 +217,11 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_ ...@@ -217,11 +217,11 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
} }
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
return (getWindowResult(pWindowResInfo, slot)->closed == true); return (getResultRow(pWindowResInfo, slot)->closed == true);
} }
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
getWindowResult(pWindowResInfo, slot)->closed = true; getResultRow(pWindowResInfo, slot)->closed = true;
} }
void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) { void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) {
...@@ -229,18 +229,21 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) { ...@@ -229,18 +229,21 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) {
return; return;
} }
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); // the result does not put into the SDiskbasedResultBuf, ignore it.
if (pWindowRes->pageId >= 0) {
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i]; SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i];
char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page);
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memset(s, 0, size); memset(s, 0, size);
RESET_RESULT_INFO(pResultInfo); RESET_RESULT_INFO(pResultInfo);
}
} }
pWindowRes->numOfRows = 0; pWindowRes->numOfRows = 0;
pWindowRes->pageId = -1; pWindowRes->pageId = -1;
pWindowRes->rowId = -1; pWindowRes->rowId = -1;
...@@ -327,6 +330,8 @@ SResultRow* getNewResultRow(SResultRowPool* p) { ...@@ -327,6 +330,8 @@ SResultRow* getNewResultRow(SResultRowPool* p) {
} }
p->position.pos = (p->position.pos + 1)%p->numOfElemPerBlock; p->position.pos = (p->position.pos + 1)%p->numOfElemPerBlock;
initResultRow(ptr);
return ptr; return ptr;
} }
......
...@@ -114,10 +114,12 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p ...@@ -114,10 +114,12 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p
* @param dsize size of actual data * @param dsize size of actual data
* @return hash node * @return hash node
*/ */
static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) { static FORCE_INLINE SHashNode *doUpdateHashNode(SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
assert(pNode->keyLen == pNewNode->keyLen); assert(pNode->keyLen == pNewNode->keyLen);
if (prev != NULL) { if (prev != NULL) {
prev->next = pNewNode; prev->next = pNewNode;
} else {
pe->next = pNewNode;
} }
pNewNode->next = pNode->next; pNewNode->next = pNode->next;
...@@ -242,7 +244,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -242,7 +244,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
} else { } else {
// not support the update operation, return error // not support the update operation, return error
if (pHashObj->enableUpdate) { if (pHashObj->enableUpdate) {
doUpdateHashNode(prev, pNode, pNewNode); doUpdateHashNode(pe, prev, pNode, pNewNode);
DO_FREE_HASH_NODE(pNode);
} else {
DO_FREE_HASH_NODE(pNewNode);
} }
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
...@@ -252,7 +257,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -252,7 +257,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize // enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type); __rd_unlock(&pHashObj->lock, pHashObj->type);
DO_FREE_HASH_NODE(pNewNode);
return pHashObj->enableUpdate ? 0 : -1; return pHashObj->enableUpdate ? 0 : -1;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册