From 70a02863a7a0606ac11d651f3f6f5ab3f3feb052 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Apr 2021 15:58:28 +0800 Subject: [PATCH] [td-2859] refactor and fix memory leaks. --- src/client/inc/tscLocalMerge.h | 12 +- src/client/src/tscLocalMerge.c | 1036 +++++--------------------------- src/client/src/tscSQLParser.c | 159 ++--- src/client/src/tscServer.c | 2 +- src/client/src/tscSubquery.c | 2 + src/client/src/tscUtil.c | 76 ++- src/inc/tsdb.h | 2 +- src/query/inc/qExecutor.h | 2 + src/query/inc/qSqlparser.h | 56 +- src/query/inc/sql.y | 12 +- src/query/src/qExecutor.c | 2 +- src/query/src/qSqlParser.c | 134 ++--- src/query/src/sql.c | 51 +- src/tsdb/src/tsdbRead.c | 8 +- 14 files changed, 420 insertions(+), 1134 deletions(-) diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 45fa11d143..143922bb1f 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -44,26 +44,16 @@ typedef struct SLocalMerger { int32_t numOfCompleted; int32_t numOfVnode; SLoserTreeInfo * pLoserTree; - char * prevRowOfInput; tFilePage * pResultBuf; int32_t nResultBufSize; tFilePage * pTempBuffer; struct SQLFunctionCtx *pCtx; int32_t rowSize; // size of each intermediate result. - bool hasPrevRow; // cannot be released - bool hasUnprocessedRow; tOrderDescriptor * pDesc; SColumnModel * resColModel; SColumnModel* finalModel; tExtMemBuffer ** pExtMemBuffer; // disk-based buffer - SFillInfo* pFillInfo; // interpolation support structure - char* pFinalRes; // result data after interpo - tFilePage* discardData; - bool discard; - int32_t offset; // limit offset value bool orderPrjOnSTable; // projection query on stable - char* tagBuf; // max tag buffer - int32_t tagBufLen; } SLocalMerger; typedef struct SRetrieveSupport { @@ -96,7 +86,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde void tscDestroyLocalMerger(SSqlObj *pSql); -int32_t tscDoLocalMerge(SSqlObj *pSql); +//int32_t tscDoLocalMerge(SSqlObj *pSql); #ifdef __cplusplus } diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 86e5349a6f..180ef0a5f1 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -129,58 +129,39 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem } } -static UNUSED_FUNC void setCtxInputOutputBuffer(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx, SLocalMerger *pMerger, - tOrderDescriptor *pDesc) { - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - - for (int32_t i = 0; i < size; ++i) { - SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); - pCtx[i].pOutput = pMerger->pResultBuf->data + pExpr->base.offset * pMerger->resColModel->capacity; - - // input buffer hold only one point data - int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); - pCtx[i].pInput = pMerger->pTempBuffer->data + offset; - - int32_t functionId = pCtx[i].functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - pCtx[i].ptsOutputBuf = pCtx[0].pOutput; - } - } -} - -static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { - int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo); - int32_t offset = 0; - - SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); - for(int32_t i = 0; i < numOfCols; ++i) { - SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); - - if (pIField->pExpr->pExpr == NULL) { - SExprInfo* pExpr = pIField->pExpr; - - pFillCol[i].col.bytes = pExpr->base.resBytes; - pFillCol[i].col.type = (int8_t)pExpr->base.resType; - pFillCol[i].col.colId = pExpr->base.colInfo.colId; - pFillCol[i].flag = pExpr->base.colInfo.flag; - pFillCol[i].col.offset = offset; - pFillCol[i].functionId = pExpr->base.functionId; - pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; - } else { - pFillCol[i].col.bytes = pIField->field.bytes; - pFillCol[i].col.type = (int8_t)pIField->field.type; - pFillCol[i].col.colId = -100; - pFillCol[i].flag = TSDB_COL_NORMAL; - pFillCol[i].col.offset = offset; - pFillCol[i].functionId = -1; - pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; - } - - offset += pFillCol[i].col.bytes; - } - - return pFillCol; -} +//static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { +// int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo); +// int32_t offset = 0; +// +// SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); +// for(int32_t i = 0; i < numOfCols; ++i) { +// SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); +// +// if (pIField->pExpr->pExpr == NULL) { +// SExprInfo* pExpr = pIField->pExpr; +// +// pFillCol[i].col.bytes = pExpr->base.resBytes; +// pFillCol[i].col.type = (int8_t)pExpr->base.resType; +// pFillCol[i].col.colId = pExpr->base.colInfo.colId; +// pFillCol[i].flag = pExpr->base.colInfo.flag; +// pFillCol[i].col.offset = offset; +// pFillCol[i].functionId = pExpr->base.functionId; +// pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; +// } else { +// pFillCol[i].col.bytes = pIField->field.bytes; +// pFillCol[i].col.type = (int8_t)pIField->field.type; +// pFillCol[i].col.colId = -100; +// pFillCol[i].flag = TSDB_COL_NORMAL; +// pFillCol[i].col.offset = offset; +// pFillCol[i].functionId = -1; +// pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; +// } +// +// offset += pFillCol[i].col.bytes; +// } +// +// return pFillCol; +//} void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj *pSql) { @@ -333,15 +314,8 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde assert(false); // todo fixed row size is larger than the minimum page size; } - pMerger->hasPrevRow = false; - pMerger->hasUnprocessedRow = false; - - pMerger->prevRowOfInput = (char *)calloc(1, pMerger->rowSize); - // used to keep the latest input row pMerger->pTempBuffer = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage)); - pMerger->discardData = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage)); - pMerger->discard = false; pMerger->nResultBufSize = pMemBuffer[0]->pageSize * 16; pMerger->pResultBuf = (tFilePage *)calloc(1, pMerger->nResultBufSize + sizeof(tFilePage)); @@ -355,15 +329,15 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde } assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pMerger->rowSize); - pMerger->pFinalRes = calloc(1, pMerger->rowSize * pMerger->resColModel->capacity); +// pMerger->pFinalRes = calloc(1, pMerger->rowSize * pMerger->resColModel->capacity); - if (pMerger->pTempBuffer == NULL || pMerger->discardData == NULL || pMerger->pResultBuf == NULL || - pMerger->pFinalRes == NULL || pMerger->prevRowOfInput == NULL) { + if (pMerger->pTempBuffer == NULL || pMerger->pLoserTree == NULL /*|| pMerger->pResultBuf == NULL || + pMerger->pFinalRes == NULL || pMerger->prevRowOfInput == NULL*/) { tfree(pMerger->pTempBuffer); - tfree(pMerger->discardData); - tfree(pMerger->pResultBuf); - tfree(pMerger->pFinalRes); - tfree(pMerger->prevRowOfInput); +// tfree(pMerger->discardData); +// tfree(pMerger->pResultBuf); +// tfree(pMerger->pFinalRes); +// tfree(pMerger->prevRowOfInput); tfree(pMerger->pLoserTree); tfree(param); tfree(pMerger); @@ -372,7 +346,6 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde } pMerger->pTempBuffer->num = 0; - tscCreateResPointerInfo(pRes, pQueryInfo); SSchema* pschema = calloc(pDesc->pColumnModel->numOfCols, sizeof(SSchema)); @@ -393,8 +366,6 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde } } - pMerger->tagBuf = calloc(1, maxBufSize); - // we change the capacity of schema to denote that there is only one row in temp buffer pMerger->pDesc->pColumnModel->capacity = 1; @@ -404,24 +375,22 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pQueryInfo->limit.offset = pQueryInfo->prjOffset; } - pMerger->offset = (int32_t)pQueryInfo->limit.offset; - pRes->pLocalMerger = pMerger; pRes->numOfGroups = 0; - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); +// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); +// STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey; - int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision); +// TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey; +// int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision); - if (pQueryInfo->fillType != TSDB_FILL_NONE) { - SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); - pMerger->pFillInfo = - taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, 4096, - (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, - pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); - } +// if (pQueryInfo->fillType != TSDB_FILL_NONE) { +// SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); +// pMerger->pFillInfo = +// taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, 4096, +// (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, +// pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); +// } } static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, @@ -522,33 +491,30 @@ void tscDestroyLocalMerger(SSqlObj *pSql) { return; } - SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); +// SSqlCmd * pCmd = &pSql->cmd; +// SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); // there is no more result, so we release all allocated resource SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL); if (pLocalMerge != NULL) { - pLocalMerge->pFillInfo = taosDestroyFillInfo(pLocalMerge->pFillInfo); - - if (pLocalMerge->pCtx != NULL) { - int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); - for (int32_t i = 0; i < numOfExprs; ++i) { - SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[i]; - - tVariantDestroy(&pCtx->tag); - tfree(pCtx->resultInfo); +// pLocalMerge->pFillInfo = taosDestroyFillInfo(pLocalMerge->pFillInfo); - if (pCtx->tagInfo.pTagCtxList != NULL) { - tfree(pCtx->tagInfo.pTagCtxList); - } - } - - tfree(pLocalMerge->pCtx); - } - - tfree(pLocalMerge->prevRowOfInput); +// if (pLocalMerge->pCtx != NULL) { +// int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); +// for (int32_t i = 0; i < numOfExprs; ++i) { +// SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[i]; +// +// tVariantDestroy(&pCtx->tag); +// tfree(pCtx->resultInfo); +// +// if (pCtx->tagInfo.pTagCtxList != NULL) { +// tfree(pCtx->tagInfo.pTagCtxList); +// } +// } +// +// tfree(pLocalMerge->pCtx); +// } - tfree(pLocalMerge->pTempBuffer); tfree(pLocalMerge->pResultBuf); if (pLocalMerge->pLoserTree) { @@ -556,9 +522,6 @@ void tscDestroyLocalMerger(SSqlObj *pSql) { tfree(pLocalMerge->pLoserTree); } - tfree(pLocalMerge->pFinalRes); - tfree(pLocalMerge->discardData); - tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, pLocalMerge->finalModel, pLocalMerge->numOfVnode); for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) { @@ -905,227 +868,6 @@ void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOn } } -void savePrevRecordAndSetupFillInfo(SLocalMerger *pLocalMerge, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) { - // discard following dataset in the same group and reset the interpolation information - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - if (pFillInfo != NULL) { - int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; - int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision); - - taosResetFillInfo(pFillInfo, revisedSTime); - } - - pLocalMerge->discard = true; - pLocalMerge->discardData->num = 0; - - SColumnModel *pModel = pLocalMerge->pDesc->pColumnModel; - tColModelAppend(pModel, pLocalMerge->discardData, pLocalMerge->prevRowOfInput, 0, 1, 1); -} - -static void genFinalResWithoutFill(SSqlRes* pRes, SLocalMerger *pLocalMerge, SQueryInfo* pQueryInfo) { - assert(pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE); - - tFilePage * pBeforeFillData = pLocalMerge->pResultBuf; - - pRes->data = pLocalMerge->pFinalRes; - pRes->numOfRows = (int32_t) pBeforeFillData->num; - - if (pQueryInfo->limit.offset > 0) { - if (pQueryInfo->limit.offset < pRes->numOfRows) { - int32_t prevSize = (int32_t) pBeforeFillData->num; - tColModelErase(pLocalMerge->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1); - - /* remove the hole in column model */ - tColModelCompact(pLocalMerge->finalModel, pBeforeFillData, prevSize); - - pRes->numOfRows -= (int32_t) pQueryInfo->limit.offset; - pQueryInfo->limit.offset = 0; - } else { - pQueryInfo->limit.offset -= pRes->numOfRows; - pRes->numOfRows = 0; - } - } - - if (pRes->numOfRowsGroup >= pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) { - pRes->numOfRows = 0; - pBeforeFillData->num = 0; - pLocalMerge->discard = true; - return; - } - - pRes->numOfRowsGroup += pRes->numOfRows; - - // impose the limitation of output rows on the final result - if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) { - int32_t prevSize = (int32_t)pBeforeFillData->num; - int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit); - assert(overflow < pRes->numOfRows); - - pRes->numOfRowsGroup = pQueryInfo->limit.limit; - pRes->numOfRows -= overflow; - pBeforeFillData->num -= overflow; - - tColModelCompact(pLocalMerge->finalModel, pBeforeFillData, prevSize); - - // set remain data to be discarded, and reset the interpolation information - savePrevRecordAndSetupFillInfo(pLocalMerge, pQueryInfo, pLocalMerge->pFillInfo); - } - - memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalMerge->finalModel->rowSize)); - - pRes->numOfClauseTotal += pRes->numOfRows; - pBeforeFillData->num = 0; -} - -/* - * Note: pRes->pLocalMerge may be null, due to the fact that "tscDestroyLocalMerger" is called - * by "interuptHandler" function in shell - */ -static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutput) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - tFilePage *pBeforeFillData = pLocalMerge->pResultBuf; - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - SFillInfo *pFillInfo = pLocalMerge->pFillInfo; - - // todo extract function - int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey; - - void** pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - pResPages[i] = calloc(1, pField->bytes * pLocalMerge->resColModel->capacity); - } - - while (1) { - int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalMerge->resColModel->capacity); - - if (pQueryInfo->limit.offset < newRows) { - newRows -= pQueryInfo->limit.offset; - - if (pQueryInfo->limit.offset > 0) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - memmove(pResPages[i], ((char*)pResPages[i]) + pField->bytes * pQueryInfo->limit.offset, - (size_t)(newRows * pField->bytes)); - } - } - - pRes->data = pLocalMerge->pFinalRes; - pRes->numOfRows = (int32_t) newRows; - - pQueryInfo->limit.offset = 0; - break; - } else { - pQueryInfo->limit.offset -= newRows; - pRes->numOfRows = 0; - - if (!taosFillHasMoreResults(pFillInfo)) { - if (!doneOutput) { // reduce procedure has not completed yet, but current results for fill are exhausted - break; - } - - // all output in current group are completed - int32_t totalRemainRows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, actualETime, pLocalMerge->resColModel->capacity); - if (totalRemainRows <= 0) { - break; - } - } - } - } - - if (pRes->numOfRows > 0) { - int32_t currentTotal = (int32_t)(pRes->numOfRowsGroup + pRes->numOfRows); - - if (pQueryInfo->limit.limit >= 0 && currentTotal > pQueryInfo->limit.limit) { - int32_t overflow = (int32_t)(currentTotal - pQueryInfo->limit.limit); - - pRes->numOfRows -= overflow; - assert(pRes->numOfRows >= 0); - - /* set remain data to be discarded, and reset the interpolation information */ - savePrevRecordAndSetupFillInfo(pLocalMerge, pQueryInfo, pFillInfo); - } - - int32_t offset = 0; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i], (size_t)(pField->bytes * pRes->numOfRows)); - offset += pField->bytes; - } - - pRes->numOfRowsGroup += pRes->numOfRows; - pRes->numOfClauseTotal += pRes->numOfRows; - } - - pBeforeFillData->num = 0; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - tfree(pResPages[i]); - } - - tfree(pResPages); -} - -static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { - SColumnModel *pColumnModel = pLocalMerge->pDesc->pColumnModel; - assert(pColumnModel->capacity == 1 && tmpBuffer->num == 1); - - // copy to previous temp buffer - for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) { - SSchema *pSchema = getColumnModelSchema(pColumnModel, i); - int16_t offset = getColumnModelOffset(pColumnModel, i); - - memcpy(pLocalMerge->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes); - } - - tmpBuffer->num = 0; - pLocalMerge->hasPrevRow = true; -} - -static void doExecuteFinalMerge( SLocalMerger *pLocalMerge, int32_t numOfExpr, bool needInit) { - // the tag columns need to be set before all functions execution - for (int32_t j = 0; j < numOfExpr; ++j) { - SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[j]; - - // tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer - int32_t functionId = pCtx->functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) { - tVariantDestroy(&pCtx->tag); - char* input = pCtx->pInput; - - if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) { - assert(varDataLen(input) <= pCtx->inputBytes); - tVariantCreateFromBinary(&pCtx->tag, varDataVal(input), varDataLen(input), pCtx->inputType); - } else { - tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType); - } - - } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { -// SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, j); // TODO this data is from -// pCtx->param[0].i64 = pExpr->base.param[0].i64; - } - - pCtx->currentStage = MERGE_STAGE; - - if (needInit) { - aAggs[pCtx->functionId].init(pCtx); - } - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pLocalMerge->pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - aAggs[functionId].mergeFunc(&pLocalMerge->pCtx[j]); - } -} - //TODO it is not ordered, fix it static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex, bool* hasPrev) { int32_t size = (int32_t) taosArrayGetSize(pColumnList); @@ -1242,39 +984,27 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S tfree(add); } -static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { - if (pLocalMerge->hasUnprocessedRow) { - pLocalMerge->hasUnprocessedRow = false; - - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - - doExecuteFinalMerge(pLocalMerge, size, true); - savePreviousRow(pLocalMerge, tmpBuffer); - } -} - -static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) { - int64_t maxOutput = 0; - - for (int32_t j = 0; j < numOfExprs; ++j) { - /* - * ts, tag, tagprj function can not decide the output number of current query - * the number of output result is decided by main output - */ - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG) { - continue; - } - - SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); - if (maxOutput < pResInfo->numOfRes) { - maxOutput = pResInfo->numOfRes; - } - } - - return maxOutput; -} +//static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) { +// int64_t maxOutput = 0; +// +// for (int32_t j = 0; j < numOfExprs; ++j) { +// /* +// * ts, tag, tagprj function can not decide the output number of current query +// * the number of output result is decided by main output +// */ +// int32_t functionId = pCtx[j].functionId; +// if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG) { +// continue; +// } +// +// SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); +// if (maxOutput < pResInfo->numOfRes) { +// maxOutput = pResInfo->numOfRes; +// } +// } +// +// return maxOutput; +//} /* * in handling the top/bottom query, which produce more than one rows result, @@ -1282,38 +1012,38 @@ static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) { * filled with the same result, which is the tags, specified in group by clause * */ -static void fillMultiRowsOfTagsVal(SLocalMerger *pLocalMerge, int32_t numOfRes, int32_t numOfExprs) { - for (int32_t k = 0; k < numOfExprs; ++k) { - SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k]; - if (pCtx->functionId != TSDB_FUNC_TAG) { - continue; - } - - int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result - memset(pLocalMerge->tagBuf, 0, (size_t)pLocalMerge->tagBufLen); - memcpy(pLocalMerge->tagBuf, pCtx->pOutput, (size_t)pCtx->outputBytes); - - for (int32_t i = 0; i < inc; ++i) { - pCtx->pOutput += pCtx->outputBytes; - memcpy(pCtx->pOutput, pLocalMerge->tagBuf, (size_t)pCtx->outputBytes); - } - } -} - -int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) { - for (int32_t k = 0; k < numOfExprs; ++k) { - SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k]; - aAggs[pCtx->functionId].xFinalize(pCtx); - } - - pLocalMerge->hasPrevRow = false; - - int32_t numOfRes = (int32_t)getNumOfResultLocal(pLocalMerge->pCtx, numOfExprs); - pLocalMerge->pResultBuf->num += numOfRes; +//static void fillMultiRowsOfTagsVal(SLocalMerger *pLocalMerge, int32_t numOfRes, int32_t numOfExprs) { +// for (int32_t k = 0; k < numOfExprs; ++k) { +// SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k]; +// if (pCtx->functionId != TSDB_FUNC_TAG) { +// continue; +// } +// +// int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result +// memset(pLocalMerge->tagBuf, 0, (size_t)pLocalMerge->tagBufLen); +// memcpy(pLocalMerge->tagBuf, pCtx->pOutput, (size_t)pCtx->outputBytes); +// +// for (int32_t i = 0; i < inc; ++i) { +// pCtx->pOutput += pCtx->outputBytes; +// memcpy(pCtx->pOutput, pLocalMerge->tagBuf, (size_t)pCtx->outputBytes); +// } +// } +//} - fillMultiRowsOfTagsVal(pLocalMerge, numOfRes, numOfExprs); - return numOfRes; -} +//int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) { +// for (int32_t k = 0; k < numOfExprs; ++k) { +// SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k]; +// aAggs[pCtx->functionId].xFinalize(pCtx); +// } +// +// pLocalMerge->hasPrevRow = false; +// +// int32_t numOfRes = (int32_t)getNumOfResultLocal(pLocalMerge->pCtx, numOfExprs); +// pLocalMerge->pResultBuf->num += numOfRes; +// +// fillMultiRowsOfTagsVal(pLocalMerge, numOfRes, numOfExprs); +// return numOfRes; +//} /* * points merge: @@ -1322,29 +1052,29 @@ int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) { * results generated by simple aggregation function, we merge them all into one points * *Exception*: column projection query, required no merge procedure */ -bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { - int32_t ret = 0; // merge all result by default - - int16_t functionId = pLocalMerge->pCtx[0].functionId; - - // todo opt performance - if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query - ret = 1; // disable merge procedure - } else { - tOrderDescriptor *pDesc = pLocalMerge->pDesc; - if (pDesc->orderInfo.numOfCols > 0) { - if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc - // todo refactor comparator - ret = compare_a(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); - } else { // desc - ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); - } - } - } - - /* if ret == 0, means the result belongs to the same group */ - return (ret == 0); -} +//bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { +// int32_t ret = 0; // merge all result by default +// +// int16_t functionId = pLocalMerge->pCtx[0].functionId; +// +// // todo opt performance +// if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query +// ret = 1; // disable merge procedure +// } else { +// tOrderDescriptor *pDesc = pLocalMerge->pDesc; +// if (pDesc->orderInfo.numOfCols > 0) { +// if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc +// // todo refactor comparator +// ret = compare_a(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); +// } else { // desc +// ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); +// } +// } +// } +// +// /* if ret == 0, means the result belongs to the same group */ +// return (ret == 0); +//} bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { int32_t ret = 0; @@ -1356,90 +1086,6 @@ bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, return (ret == 0); } -static bool reachGroupResultLimit(SQueryInfo *pQueryInfo, SSqlRes *pRes) { - return (pRes->numOfGroups >= pQueryInfo->slimit.limit && pQueryInfo->slimit.limit >= 0); -} - -static bool saveGroupResultInfo(SSqlObj *pSql) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - - if (pRes->numOfRowsGroup > 0) { - pRes->numOfGroups += 1; - } - - // the output group is limited by the slimit clause - if (reachGroupResultLimit(pQueryInfo, pRes)) { - return true; - } - - // pRes->pGroupRec = realloc(pRes->pGroupRec, pRes->numOfGroups*sizeof(SResRec)); - // pRes->pGroupRec[pRes->numOfGroups-1].numOfRows = pRes->numOfRows; - // pRes->pGroupRec[pRes->numOfGroups-1].numOfClauseTotal = pRes->numOfClauseTotal; - - return false; -} - -/** - * - * @param pSql - * @param pLocalMerge - * @param noMoreCurrentGroupRes - * @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups - */ -bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurrentGroupRes) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - tFilePage * pResBuf = pLocalMerge->pResultBuf; - SColumnModel *pModel = pLocalMerge->resColModel; - - pRes->code = TSDB_CODE_SUCCESS; - - /* - * Ignore the output of the current group since this group is skipped by user - * We set the numOfRows to be 0 and discard the possible remain results. - */ - if (pQueryInfo->slimit.offset > 0) { - pRes->numOfRows = 0; - pQueryInfo->slimit.offset -= 1; - pLocalMerge->discard = !noMoreCurrentGroupRes; - - if (pLocalMerge->discard) { - SColumnModel *pInternModel = pLocalMerge->pDesc->pColumnModel; - tColModelAppend(pInternModel, pLocalMerge->discardData, pLocalMerge->pTempBuffer->data, 0, 1, 1); - } - - return false; - } - - tColModelCompact(pModel, pResBuf, pModel->capacity); - - if (tscIsSecondStageQuery(pQueryInfo)) { - doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize); - } - - // no interval query, no fill operation - if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { - genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo); - } else { - SFillInfo* pFillInfo = pLocalMerge->pFillInfo; - if (pFillInfo != NULL) { - TSKEY ekey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey; - - taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, ekey); - taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); - } - - doFillResult(pSql, pLocalMerge, noMoreCurrentGroupRes); - } - - return true; -} - void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning size_t t = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < t; ++i) { @@ -1454,305 +1100,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset memset(pLocalMerge->pResultBuf, 0, pLocalMerge->nResultBufSize + sizeof(tFilePage)); } -static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalMerger *pLocalMerge) { - // In handling data in other groups, we need to reset the interpolation information for a new group data - pRes->numOfRows = 0; - pRes->numOfRowsGroup = 0; - - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - - pQueryInfo->limit.offset = pLocalMerge->offset; - - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - // for group result interpolation, do not return if not data is generated - if (pQueryInfo->fillType != TSDB_FILL_NONE) { - TSKEY skey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey:pQueryInfo->window.ekey;//MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); - int64_t newTime = taosTimeTruncate(skey, &pQueryInfo->interval, tinfo.precision); - taosResetFillInfo(pLocalMerge->pFillInfo, newTime); - } -} - static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) { return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted); } -static bool doBuildFilledResultForGroup(SSqlObj *pSql) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - SLocalMerger *pLocalMerge = pRes->pLocalMerger; - SFillInfo *pFillInfo = pLocalMerge->pFillInfo; - - if (pFillInfo != NULL && taosFillHasMoreResults(pFillInfo)) { - assert(pQueryInfo->fillType != TSDB_FILL_NONE); - - tFilePage *pFinalDataBuf = pLocalMerge->pResultBuf; - int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1)); - - // the first column must be the timestamp column - int32_t rows = (int32_t) getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalMerge->resColModel->capacity); - if (rows > 0) { // do fill gap - doFillResult(pSql, pLocalMerge, false); - } - - return true; - } else { - return false; - } -} - -static bool doHandleLastRemainData(SSqlObj *pSql) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SLocalMerger *pLocalMerge = pRes->pLocalMerger; - SFillInfo *pFillInfo = pLocalMerge->pFillInfo; - - bool prevGroupCompleted = (!pLocalMerge->discard) && pLocalMerge->hasUnprocessedRow; - - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - - if ((isAllSourcesCompleted(pLocalMerge) && !pLocalMerge->hasPrevRow) || pLocalMerge->pLocalDataSrc[0] == NULL || - prevGroupCompleted) { - // if fillType == TSDB_FILL_NONE, return directly - if (pQueryInfo->fillType != TSDB_FILL_NONE && - ((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) { - int64_t etime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey : pQueryInfo->window.skey; - - int32_t rows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalMerge->resColModel->capacity); - if (rows > 0) { - doFillResult(pSql, pLocalMerge, true); - } - } - - /* - * 1. numOfRows == 0, means no interpolation results are generated. - * 2. if all local data sources are consumed, and no un-processed rows exist. - * - * No results will be generated and query completed. - */ - if (pRes->numOfRows > 0 || (isAllSourcesCompleted(pLocalMerge) && (!pLocalMerge->hasUnprocessedRow))) { - return true; - } - - // start to process result for a new group and save the result info of previous group - if (saveGroupResultInfo(pSql)) { - return true; - } - - resetEnvForNewResultset(pRes, pCmd, pLocalMerge); - } - - return false; -} - -static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SLocalMerger *pLocalMerge = pRes->pLocalMerger; - SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - - for (int32_t k = 0; k < size; ++k) { - SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k]; - pCtx->pOutput += pCtx->outputBytes * numOfRes; - - // set the correct output timestamp column position - if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) { - pCtx->ptsOutputBuf = ((char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * numOfRes); - } - } - - doExecuteFinalMerge(pLocalMerge, size, true); -} - -int32_t tscDoLocalMerge(SSqlObj *pSql) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - tscResetForNextRetrieve(pRes); - assert(pSql->signature == pSql); - - if (pRes->pLocalMerger == NULL) { // all data has been processed - if (pRes->code == TSDB_CODE_SUCCESS) { - return pRes->code; - } - - tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code)); - return pRes->code; - } - - SLocalMerger *pLocalMerge = pRes->pLocalMerger; - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - tFilePage *tmpBuffer = pLocalMerge->pTempBuffer; - - int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); - if (doHandleLastRemainData(pSql)) { - return TSDB_CODE_SUCCESS; - } - - if (doBuildFilledResultForGroup(pSql)) { - return TSDB_CODE_SUCCESS; - } - - SLoserTreeInfo *pTree = pLocalMerge->pLoserTree; - - // clear buffer - handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer); - SColumnModel *pModel = pLocalMerge->pDesc->pColumnModel; - - while (1) { - if (isAllSourcesCompleted(pLocalMerge)) { - break; - } - -#ifdef _DEBUG_VIEW - printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index); -#endif - - assert((pTree->pNode[0].index < pLocalMerge->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0); - - // chosen from loser tree - SLocalDataSource *pOneDataSrc = pLocalMerge->pLocalDataSrc[pTree->pNode[0].index]; - - tColModelAppend(pModel, tmpBuffer, pOneDataSrc->filePage.data, pOneDataSrc->rowIdx, 1, - pOneDataSrc->pMemBuffer->pColumnModel->capacity); - -#if defined(_DEBUG_VIEW) - printf("chosen row:\t"); - SSrcColumnInfo colInfo[256] = {0}; - tscGetSrcColumnInfo(colInfo, pQueryInfo); - - tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->num, pModel->capacity, colInfo); -#endif - - if (pLocalMerge->discard) { - assert(pLocalMerge->hasUnprocessedRow == false); - - /* current record belongs to the same group of previous record, need to discard it */ - if (isSameGroup(pCmd, pLocalMerge, pLocalMerge->discardData->data, tmpBuffer)) { - tmpBuffer->num = 0; - pOneDataSrc->rowIdx += 1; - - adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree); - - // all inputs are exhausted, abort current process - if (isAllSourcesCompleted(pLocalMerge)) { - break; - } - - // data belongs to the same group needs to be discarded - continue; - } else { - pLocalMerge->discard = false; - pLocalMerge->discardData->num = 0; - - if (saveGroupResultInfo(pSql)) { - return TSDB_CODE_SUCCESS; - } - - resetEnvForNewResultset(pRes, pCmd, pLocalMerge); - } - } - - if (pLocalMerge->hasPrevRow) { - if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) { - // belong to the group of the previous row, continue process it - doExecuteFinalMerge(pLocalMerge, numOfExprs, false); - - // copy to buffer - savePreviousRow(pLocalMerge, tmpBuffer); - } else { - /* - * current row does not belong to the group of previous row. - * so the processing of previous group is completed. - */ - int32_t numOfRes = finalizeRes(pLocalMerge, numOfExprs); - bool sameGroup = isSameGroup(pCmd, pLocalMerge, pLocalMerge->prevRowOfInput, tmpBuffer); - - tFilePage *pResBuf = pLocalMerge->pResultBuf; - - /* - * if the previous group does NOT generate any result (pResBuf->num == 0), - * continue to process results instead of return results. - */ - if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalMerge->resColModel->capacity)) { - // does not belong to the same group - bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup); - - // this row needs to discard, since it belongs to the group of previous - if (pLocalMerge->discard && sameGroup) { - pLocalMerge->hasUnprocessedRow = false; - tmpBuffer->num = 0; - } else { // current row does not belongs to the previous group, so it is not be handled yet. - pLocalMerge->hasUnprocessedRow = true; - } - - resetOutputBuf(pQueryInfo, pLocalMerge); - pOneDataSrc->rowIdx += 1; - - // here we do not check the return value - adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree); - - if (pRes->numOfRows == 0) { - handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer); - - if (!sameGroup) { - /* - * previous group is done, prepare for the next group - * If previous group is not skipped, keep it in pRes->numOfGroups - */ - if (notSkipped && saveGroupResultInfo(pSql)) { - return TSDB_CODE_SUCCESS; - } - - resetEnvForNewResultset(pRes, pCmd, pLocalMerge); - } - } else { - /* - * if next record belongs to a new group, we do not handle this record here. - * We start the process in a new round. - */ - if (sameGroup) { - handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer); - } - } - - // current group has no result, - if (pRes->numOfRows == 0) { - continue; - } else { - return TSDB_CODE_SUCCESS; - } - } else { // result buffer is not full - doProcessResultInNextWindow(pSql, numOfRes); - savePreviousRow(pLocalMerge, tmpBuffer); - } - } - } else { - doExecuteFinalMerge(pLocalMerge, numOfExprs,true); - savePreviousRow(pLocalMerge, tmpBuffer); // copy the processed row to buffer - } - - pOneDataSrc->rowIdx += 1; - adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree); - } - - if (pLocalMerge->hasPrevRow) { - finalizeRes(pLocalMerge, numOfExprs); - } - - if (pLocalMerge->pResultBuf->num) { - genFinalResults(pSql, pLocalMerge, true); - } - - return TSDB_CODE_SUCCESS; -} - void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) { SSqlRes *pRes = &pObj->res; if (pRes->pLocalMerger != NULL) { @@ -1823,7 +1174,6 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_ #define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ (data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes) - static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel *pModel, int32_t rowIndex, int32_t maxRows) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { @@ -2116,14 +1466,14 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { } SSDataBlock* doSLimit(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; + SOperatorInfo *pOperator = (SOperatorInfo *)param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } SSLimitOperatorInfo *pInfo = pOperator->info; - SSDataBlock* pBlock = NULL; + SSDataBlock *pBlock = NULL; while (1) { pBlock = skipGroupBlock(pOperator, newgroup); if (pBlock == NULL) { @@ -2133,8 +1483,8 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { } if (*newgroup) { // a new group arrives - pInfo->groupTotal += 1; - pInfo->rowsTotal = 0; + pInfo->groupTotal += 1; + pInfo->rowsTotal = 0; pInfo->currentOffset = pInfo->limit.offset; } @@ -2163,90 +1513,9 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { } } - /* - if (!pInfo->hasPrev) { - pInfo->groupTotal = 1; - savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); - } else { - bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow); - if (!sameGroup) { // reset info for new group data - pInfo->rowsTotal = 0; - pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group - savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); - } else { // data in current group has reached the limit, ignore the remain data of this group - if (pInfo->limit.limit > 0 && (pInfo->rowsTotal >= pInfo->limit.limit)) { - continue; - } - } - } -*/ -// if (pInfo->currentGroupOffset == 0) { -// if (pInfo->currentOffset == 0) { // TODO refactor -// break; -// } else if (pInfo->currentOffset >= pBlock->info.rows) { -// pInfo->currentOffset -= pBlock->info.rows; -// } else { -// int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); -// pBlock->info.rows = remain; -// -// // move the remain rows of this data block to the front. -// for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { -// SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); -// -// int16_t bytes = pColInfoData->info.bytes; -// memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); -// } -// -// pInfo->currentOffset = 0; -// break; -// } -// } else { -// if (pInfo->hasPrev) { -// // Check if current data block belongs to current result group or not -// bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow); -// if (sameGroup) { -// continue; // ignore the data block of the same group and try next -// } else { -// //update the group column data by using the current group. -// savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); -// -// pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group -// pInfo->rowsTotal = 0; -// -// if ((--pInfo->currentGroupOffset) == 0) { -// if (pInfo->currentOffset == 0) { // TODO refactor -// break; -// } else if (pInfo->currentOffset >= pBlock->info.rows) { -// pInfo->currentOffset -= pBlock->info.rows; -// } else { -// int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); -// pBlock->info.rows = remain; -// -// // move the remain rows of this data block to the front. -// for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { -// SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); -// -// int16_t bytes = pColInfoData->info.bytes; -// memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); -// } -// -// pInfo->currentOffset = 0; -// break; -// } -// } -// } -// } else { -// savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); -// } -// } -// } - -// if (!pInfo->hasPrev || !isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow)) { -// pInfo->groupTotal += 1; - if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort - return NULL; - } -// } + if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort + return NULL; + } if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); @@ -2256,11 +1525,10 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { pOperator->status = OP_EXEC_DONE; } -// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + // setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); } else { pInfo->rowsTotal += pBlock->info.rows; } return pBlock; } - diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ca293a8cf9..ef0f44d7e4 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -85,15 +85,15 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd); -static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); +static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken); static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding); static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem); static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql); -static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); -static int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode, SSchema* pSchema); +static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); +static int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema); static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); @@ -110,14 +110,14 @@ static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static bool hasNormalColumnFilter(SQueryInfo* pQueryInfo); -static int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t index, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql); +static int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t index, SSqlNode* pSqlNode, SSqlObj* pSql); static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDbInfo* pCreateDbSql); static int32_t getColumnIndexByName(SSqlCmd* pCmd, const SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getTableIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getTableIndexImpl(SStrToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); -static int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); +static int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate); static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex); @@ -125,7 +125,7 @@ static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t col static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* pInfo); static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); -static int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index); +static int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index); static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, uint64_t *uid); static bool validateDebugFlag(int32_t v); static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); @@ -617,7 +617,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { case TSDB_SQL_SELECT: { const char* msg1 = "columns in select clause not identical"; - for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) { + size_t size = taosArrayGetSize(pInfo->list); + for (int32_t i = pCmd->numOfClause; i < size; ++i) { SQueryInfo* p = tscGetQueryInfoS(pCmd, i); if (p == NULL) { pRes->code = terrno; @@ -625,11 +626,11 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } } - assert(pCmd->numOfClause == pInfo->subclauseInfo.numOfClause); - for (int32_t i = pCmd->clauseIndex; i < pInfo->subclauseInfo.numOfClause; ++i) { - SQuerySqlNode* pQuerySqlNode = pInfo->subclauseInfo.pClause[i]; - tscTrace("%p start to parse %dth subclause, total:%d", pSql, i, pInfo->subclauseInfo.numOfClause); - if ((code = validateSqlNode(pSql, pQuerySqlNode, i)) != TSDB_CODE_SUCCESS) { + assert(pCmd->numOfClause == size); + for (int32_t i = pCmd->clauseIndex; i < size; ++i) { + SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i); + tscTrace("%p start to parse %dth subclause, total:%"PRId64, pSql, i, size); + if ((code = validateSqlNode(pSql, pSqlNode, i)) != TSDB_CODE_SUCCESS) { return code; } @@ -784,7 +785,7 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn return addPrimaryTsColumnForTimeWindowQuery(pQueryInfo); } -int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { +int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) { const char* msg2 = "interval cannot be less than 10 ms"; const char* msg3 = "sliding cannot be used without interval"; @@ -793,8 +794,8 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - if (!TPARSER_HAS_TOKEN(pQuerySqlNode->interval.interval)) { - if (TPARSER_HAS_TOKEN(pQuerySqlNode->sliding)) { + if (!TPARSER_HAS_TOKEN(pSqlNode->interval.interval)) { + if (TPARSER_HAS_TOKEN(pSqlNode->sliding)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -807,7 +808,7 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod } // interval is not null - SStrToken *t = &pQuerySqlNode->interval.interval; + SStrToken *t = &pSqlNode->interval.interval; if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -824,11 +825,11 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod } } - if (parseIntervalOffset(pCmd, pQueryInfo, &pQuerySqlNode->interval.offset) != TSDB_CODE_SUCCESS) { + if (parseIntervalOffset(pCmd, pQueryInfo, &pSqlNode->interval.offset) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - if (parseSlidingClause(pCmd, pQueryInfo, &pQuerySqlNode->sliding) != TSDB_CODE_SUCCESS) { + if (parseSlidingClause(pCmd, pQueryInfo, &pSqlNode->sliding) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -836,19 +837,19 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod return checkInvalidExprForTimeWindow(pCmd, pQueryInfo); } -int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode * pQuerySqlNode) { +int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) { const char* msg1 = "gap should be fixed time window"; const char* msg2 = "only one type time window allowed"; const char* msg3 = "invalid column name"; const char* msg4 = "invalid time window"; // no session window - if (!TPARSER_HAS_TOKEN(pQuerySqlNode->sessionVal.gap)) { + if (!TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap)) { return TSDB_CODE_SUCCESS; } - SStrToken* col = &pQuerySqlNode->sessionVal.col; - SStrToken* gap = &pQuerySqlNode->sessionVal.gap; + SStrToken* col = &pSqlNode->sessionVal.col; + SStrToken* gap = &pSqlNode->sessionVal.gap; char timeUnit = 0; if (parseNatualDuration(gap->z, gap->n, &pQueryInfo->sessionWindow.gap, &timeUnit) != TSDB_CODE_SUCCESS) { @@ -4714,9 +4715,9 @@ int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { return TSDB_CODE_SUCCESS; } -int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { - SArray* pFillToken = pQuerySqlNode->fillType; - if (pQuerySqlNode->fillType == NULL) { +int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) { + SArray* pFillToken = pSqlNode->fillType; + if (pSqlNode->fillType == NULL) { return TSDB_CODE_SUCCESS; } @@ -4854,7 +4855,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { } } -int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode, SSchema* pSchema) { +int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) { const char* msg0 = "only support order by primary timestamp"; const char* msg1 = "invalid column name"; const char* msg2 = "order by primary timestamp or first tag in groupby clause allowed"; @@ -4869,11 +4870,11 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode pQueryInfo->order.orderColId = 0; return TSDB_CODE_SUCCESS; } - if (pQuerySqlNode->pSortOrder == NULL) { + if (pSqlNode->pSortOrder == NULL) { return TSDB_CODE_SUCCESS; } - SArray* pSortorder = pQuerySqlNode->pSortOrder; + SArray* pSortorder = pSqlNode->pSortOrder; /* * for table query, there is only one or none order option is allowed, which is the @@ -4941,7 +4942,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode if (orderByTags) { pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - tVariantListItem* p1 = taosArrayGet(pQuerySqlNode->pSortOrder, 0); + tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->groupbyExpr.orderType = p1->sortOrder; } else if (isTopBottomQuery(pQueryInfo)) { /* order of top/bottom query in interval is not valid */ @@ -4953,12 +4954,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - tVariantListItem* p1 = taosArrayGet(pQuerySqlNode->pSortOrder, 0); + tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; return TSDB_CODE_SUCCESS; } else { - tVariantListItem* p1 = taosArrayGet(pQuerySqlNode->pSortOrder, 0); + tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; @@ -4971,7 +4972,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode } if (s == 2) { - tVariantListItem *pItem = taosArrayGet(pQuerySqlNode->pSortOrder, 0); + tVariantListItem *pItem = taosArrayGet(pSqlNode->pSortOrder, 0); if (orderByTags) { pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; @@ -4980,7 +4981,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; } - pItem = taosArrayGet(pQuerySqlNode->pSortOrder, 1); + pItem = taosArrayGet(pSqlNode->pSortOrder, 1); tVariant* pVar2 = &pItem->pVar; SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz}; if (getColumnIndexByName(pCmd, &cname, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { @@ -5015,13 +5016,13 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - tVariantListItem* pItem = taosArrayGet(pQuerySqlNode->pSortOrder, 0); + tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; return TSDB_CODE_SUCCESS; } - tVariantListItem* pItem = taosArrayGet(pQuerySqlNode->pSortOrder, 0); + tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = pItem->sortOrder; } @@ -5553,7 +5554,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) { return !(pQueryInfo->window.skey != pQueryInfo->window.ekey && pQueryInfo->interval.interval == 0); } -int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql) { +int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIndex, SSqlNode* pSqlNode, SSqlObj* pSql) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); const char* msg0 = "soffset/offset can not be less than 0"; @@ -5561,9 +5562,9 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI const char* msg2 = "slimit/soffset can not apply to projection query"; // handle the limit offset value, validate the limit - pQueryInfo->limit = pQuerySqlNode->limit; + pQueryInfo->limit = pSqlNode->limit; pQueryInfo->clauseLimit = pQueryInfo->limit.limit; - pQueryInfo->slimit = pQuerySqlNode->slimit; + pQueryInfo->slimit = pSqlNode->slimit; tscDebug("%p limit:%" PRId64 ", offset:%" PRId64 " slimit:%" PRId64 ", soffset:%" PRId64, pSql, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->slimit.limit, pQueryInfo->slimit.offset); @@ -6214,12 +6215,12 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { return checkUpdateTagPrjFunctions(pQueryInfo, pCmd); } } -int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { +int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) { const char* msg1 = "only one expression allowed"; const char* msg2 = "invalid expression in select clause"; const char* msg3 = "invalid function"; - SArray* pExprList = pQuerySqlNode->pSelNodeList; + SArray* pExprList = pSqlNode->pSelNodeList; size_t size = taosArrayGetSize(pExprList); if (size != 1) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); @@ -6680,18 +6681,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { // if sql specifies db, use it, otherwise use default db SStrToken* pName = &(pCreateTable->name); - SQuerySqlNode* pQuerySqlNode = pCreateTable->pSelect; + SSqlNode* pSqlNode = pCreateTable->pSelect; if (tscValidateName(pName) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - SFromInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from; - if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->tableList) == 0) { + SRelationInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from; + if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->list) == 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } - STableNamePair* p1 = taosArrayGet(pFromInfo->tableList, 0); + STableNamePair* p1 = taosArrayGet(pFromInfo->list, 0); SStrToken srcToken = {.z = p1->name.z, .n = p1->name.n, .type = TK_STRING}; if (tscValidateName(&srcToken) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); @@ -6708,18 +6709,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { } bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - if (validateSelectNodeList(&pSql->cmd, 0, pQueryInfo, pQuerySqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) { + if (validateSelectNodeList(&pSql->cmd, 0, pQueryInfo, pSqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - if (pQuerySqlNode->pWhere != NULL) { // query condition in stream computing - if (validateWhereNode(pQueryInfo, &pQuerySqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { + if (pSqlNode->pWhere != NULL) { // query condition in stream computing + if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } } // set interval value - if (validateIntervalNode(pSql, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { + if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -6737,7 +6738,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return code; } - if (pQuerySqlNode->sqlstr.n > TSDB_MAX_SAVED_SQL_LEN) { + if (pSqlNode->sqlstr.n > TSDB_MAX_SAVED_SQL_LEN) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } @@ -6755,12 +6756,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { * check if fill operation is available, the fill operation is parsed and executed during query execution, * not here. */ - if (pQuerySqlNode->fillType != NULL) { + if (pSqlNode->fillType != NULL) { if (pQueryInfo->interval.interval == 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } - tVariantListItem* pItem = taosArrayGet(pQuerySqlNode->fillType, 0); + tVariantListItem* pItem = taosArrayGet(pSqlNode->fillType, 0); if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) { if (!((strncmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) || (strncmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4))) { @@ -6809,7 +6810,7 @@ int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { return TSDB_CODE_SUCCESS; } -static int32_t doLoadAllTableMeta(SSqlObj* pSql, int32_t index, SQuerySqlNode* pQuerySqlNode, int32_t numOfTables) { +static int32_t doLoadAllTableMeta(SSqlObj* pSql, int32_t index, SSqlNode* pSqlNode, int32_t numOfTables) { const char* msg1 = "invalid table name"; const char* msg2 = "invalid table alias name"; const char* msg3 = "alias name too long"; @@ -6824,7 +6825,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, int32_t index, SQuerySqlNode* p tscAddEmptyMetaInfo(pQueryInfo); } - STableNamePair *item = taosArrayGet(pQuerySqlNode->from->tableList, i); + STableNamePair *item = taosArrayGet(pSqlNode->from->list, i); SStrToken *oriName = &item->name; if (oriName->type == TK_INTEGER || oriName->type == TK_FLOAT) { @@ -6897,8 +6898,8 @@ static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) { return meta; } -int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) { - assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0)); +int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { + assert(pSqlNode != NULL && (pSqlNode->from == NULL || taosArrayGetSize(pSqlNode->from->list) > 0)); const char* msg1 = "point interpolation query needs timestamp"; const char* msg2 = "too many tables in from clause"; @@ -6923,15 +6924,18 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind * select client_version(); * select server_state(); */ - if (pQuerySqlNode->from == NULL) { - assert(pQuerySqlNode->fillType == NULL && pQuerySqlNode->pGroupby == NULL && pQuerySqlNode->pWhere == NULL && - pQuerySqlNode->pSortOrder == NULL); - return doLocalQueryProcess(pCmd, pQueryInfo, pQuerySqlNode); + if (pSqlNode->from == NULL) { + assert(pSqlNode->fillType == NULL && pSqlNode->pGroupby == NULL && pSqlNode->pWhere == NULL && + pSqlNode->pSortOrder == NULL); + return doLocalQueryProcess(pCmd, pQueryInfo, pSqlNode); } - if (pQuerySqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { + if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { // parse the subquery in the first place - code = validateSqlNode(pSql, pQuerySqlNode->from->pNode.pClause[0], 0); + SArray* list = taosArrayGetP(pSqlNode->from->list, 0); + SSqlNode* p = taosArrayGetP(list, 0); + + code = validateSqlNode(pSql, p, 0); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return code; } @@ -6949,24 +6953,25 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind current->pTableMetaInfo = calloc(1, POINTER_BYTES); current->pTableMetaInfo[0] = pTableMetaInfo1; + current->numOfTables = 1; pCmd->pQueryInfo[0] = current; pQueryInfo->pDownstream = current; - if (validateSelectNodeList(pCmd, index, current, pQuerySqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) { + if (validateSelectNodeList(pCmd, index, current, pSqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } } else { pQueryInfo->command = TSDB_SQL_SELECT; - size_t fromSize = taosArrayGetSize(pQuerySqlNode->from->tableList); + size_t fromSize = taosArrayGetSize(pSqlNode->from->list); if (fromSize > TSDB_MAX_JOIN_TABLE_NUM) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } // set all query tables, which are maybe more than one. - code = doLoadAllTableMeta(pSql, index, pQuerySqlNode, fromSize); + code = doLoadAllTableMeta(pSql, index, pSqlNode, fromSize); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -6984,47 +6989,47 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind } // parse the group by clause in the first place - if (validateGroupbyNode(pQueryInfo, pQuerySqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { + if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } // set where info STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - if (pQuerySqlNode->pWhere != NULL) { - if (validateWhereNode(pQueryInfo, &pQuerySqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { + if (pSqlNode->pWhere != NULL) { + if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - pQuerySqlNode->pWhere = NULL; + pSqlNode->pWhere = NULL; if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { pQueryInfo->window.skey = pQueryInfo->window.skey / 1000; pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000; } } else { // set the time rang - if (taosArrayGetSize(pQuerySqlNode->from->tableList) > 1) { + if (taosArrayGetSize(pSqlNode->from->list) > 1) { // If it is a join query, no where clause is not allowed. return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "condition missing for join query "); } } - int32_t joinQuery = (pQuerySqlNode->from != NULL && taosArrayGetSize(pQuerySqlNode->from->tableList) > 1); + int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1); int32_t timeWindowQuery = - (TPARSER_HAS_TOKEN(pQuerySqlNode->interval.interval) || TPARSER_HAS_TOKEN(pQuerySqlNode->sessionVal.gap)); + (TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap)); - if (validateSelectNodeList(pCmd, index, pQueryInfo, pQuerySqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) != + if (validateSelectNodeList(pCmd, index, pQueryInfo, pSqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } // set order by info - if (validateOrderbyNode(pCmd, pQueryInfo, pQuerySqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != + if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } // set interval value - if (validateIntervalNode(pSql, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { + if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } else { if (isTimeWindowQuery(pQueryInfo) && @@ -7033,7 +7038,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind } } - if (validateSessionNode(pCmd, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { + if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -7055,7 +7060,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind } } - if ((code = validateLimitNode(pCmd, pQueryInfo, index, pQuerySqlNode, pSql)) != TSDB_CODE_SUCCESS) { + if ((code = validateLimitNode(pCmd, pQueryInfo, index, pSqlNode, pSql)) != TSDB_CODE_SUCCESS) { return code; } @@ -7066,7 +7071,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind updateLastScanOrderIfNeeded(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo); - if ((code = validateFillNode(pCmd, pQueryInfo, pQuerySqlNode)) != TSDB_CODE_SUCCESS) { + if ((code = validateFillNode(pCmd, pQueryInfo, pSqlNode)) != TSDB_CODE_SUCCESS) { return code; } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 764876f3dd..6822a32590 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1383,7 +1383,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg = (char *)pSchema; if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql - SQuerySqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect; + SSqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect; strncpy(pMsg, pQuerySql->sqlstr.z, pQuerySql->sqlstr.n + 1); pCreateMsg->sqlLen = htons(pQuerySql->sqlstr.n + 1); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 54ef04e6a5..9b0ec79cb9 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3590,6 +3590,8 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST STsBufInfo bufInfo = {0}; SQueryParam param = {.pOperator = pa}; /*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, ¶m, NULL, 0, merger); + taosArrayDestroy(pa); + return pQInfo; _cleanup: diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 77d8d3e130..d9b38a19f1 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -525,18 +525,14 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock) { assert(pRes->numOfCols > 0); -// int32_t offset = 0; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); - pRes->urow[i] = pColData->pData/* + offset * pColData->info.bytes*/; + pRes->urow[i] = pColData->pData; pRes->length[i] = pInfo->field.bytes; - //offset += pInfo->field.bytes; - // generated the user-defined column result if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) { if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) { @@ -609,7 +605,7 @@ static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* } typedef struct SDummyInputInfo { - SSDataBlock block; + SSDataBlock *block; SSqlRes *pRes; // refactor: remove it } SDummyInputInfo; @@ -619,7 +615,7 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { SDummyInputInfo *pInput = pOperator->info; char* pData = pInput->pRes->data; - SSDataBlock* pBlock = &pInput->block; + SSDataBlock* pBlock = pInput->block; pBlock->info.rows = pInput->pRes->numOfRows; if (pBlock->info.rows == 0) { return NULL; @@ -638,30 +634,47 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { return pBlock; } +static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { + SDummyInputInfo* pInfo = (SDummyInputInfo*) param; + + // tricky + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pInfo->block->pDataBlock, i); + pColInfoData->pData = NULL; + } + + pInfo->block = destroyOutputBuf(pInfo->block); + pInfo->pRes = NULL; +} + +// todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t numOfCols) { assert(numOfCols > 0); - SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo)); + pInfo->pRes = (SSqlRes*) pResult; - pInfo->block.info.numOfCols = numOfCols; - pInfo->block.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pInfo->block = calloc(numOfCols, sizeof(SSDataBlock)); + pInfo->block->info.numOfCols = numOfCols; + + pInfo->block->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colData = {0}; colData.info.bytes = pSchema[i].bytes; colData.info.type = pSchema[i].type; colData.info.colId = pSchema[i].colId; - taosArrayPush(pInfo->block.pDataBlock, &colData); + taosArrayPush(pInfo->block->pDataBlock, &colData); } SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "DummyInputOperator"; pOptr->operatorType = OP_DummyInput; + pOptr->numOfOutput = numOfCols; pOptr->blockingOptr = false; pOptr->info = pInfo; pOptr->exec = doGetDataBlock; - + pOptr->cleanup = destroyDummyInputOperator; return pOptr; } @@ -683,18 +696,12 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (pQueryInfo->pDownstream != NULL) { // handle the following query process SQueryInfo *px = pQueryInfo->pDownstream; - SColumnInfo* colInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList); + SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList); int32_t numOfOutput = tscSqlExprNumOfExprs(px); - SExprInfo *exprInfo = NULL; - SQLFunctionCtx *pCtx = calloc(numOfOutput, sizeof(SQLFunctionCtx)); - int32_t numOfCols = taosArrayGetSize(px->colList); - SQueriedTableInfo info = {.colList = colInfo, .numOfCols = numOfCols,}; - - /*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL); + SQueriedTableInfo info = {.colList = pColumnInfo, .numOfCols = numOfCols,}; SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta); - tsCreateSQLFunctionCtx(px, pCtx, pSchema); STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),}; tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -708,11 +715,15 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput); + SExprInfo *exprInfo = NULL; + /*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL); px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN); uint64_t qId = 0; qTableQuery(px->pQInfo, &qId); convertQueryResult(pRes, px); + + tfree(pColumnInfo); } } @@ -750,9 +761,28 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { for (int32_t i = 0; i < pCmd->numOfClause; ++i) { SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, i); + + // recursive call it + if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { + SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, 0); + freeQueryInfoImpl(pUp); + clearAllTableMetaInfo(pUp, removeMeta); + if (pUp->pQInfo != NULL) { + qDestroyQueryInfo(pUp->pQInfo); + pUp->pQInfo = NULL; + } + + tfree(pUp); + } freeQueryInfoImpl(pQueryInfo); clearAllTableMetaInfo(pQueryInfo, removeMeta); + + if (pQueryInfo->pQInfo != NULL) { + qDestroyQueryInfo(pQueryInfo->pQInfo); + pQueryInfo->pQInfo = NULL; + } + tfree(pQueryInfo); } @@ -2312,7 +2342,11 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); - if (removeMeta) { + if (pTableMetaInfo->pTableMeta && pTableMetaInfo->pTableMeta->tableType == TSDB_TEMP_TABLE) { + tfree(pTableMetaInfo->pTableMeta); + } + + if (removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pTableMetaInfo->name, name); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 495bfa2384..d9342852c5 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -221,7 +221,7 @@ typedef struct { typedef struct { uint32_t numOfTables; - SArray * pGroupList; + SArray *pGroupList; SHashObj *map; // speedup acquire the tableQueryInfo by table uid } STableGroupInfo; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 84a2dda4af..4d2569976e 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -503,6 +503,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); SSDataBlock* doSLimit(void* param, bool* newgroup); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); +void* destroyOutputBuf(SSDataBlock* pBlock); + void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); diff --git a/src/query/inc/qSqlparser.h b/src/query/inc/qSqlparser.h index d684dc31c9..235df4f552 100644 --- a/src/query/inc/qSqlparser.h +++ b/src/query/inc/qSqlparser.h @@ -40,8 +40,8 @@ enum SQL_NODE_TYPE { }; enum SQL_NODE_FROM_TYPE { - SQL_NODE_FROM_SUBQUERY = 1, - SQL_NODE_FROM_NAMELIST = 2, + SQL_NODE_FROM_SUBQUERY = 1, + SQL_NODE_FROM_TABLELIST = 2, }; extern char tTokenTypeSwitcher[13]; @@ -83,11 +83,11 @@ typedef struct SSessionWindowVal { SStrToken gap; } SSessionWindowVal; -struct SFromInfo; +struct SRelationInfo; -typedef struct SQuerySqlNode { +typedef struct SSqlNode { struct SArray *pSelNodeList; // select clause - struct SFromInfo *from; // from clause SArray + struct SRelationInfo *from; // from clause SArray struct tSqlExpr *pWhere; // where clause [optional] SArray *pGroupby; // groupby clause, only for tags[optional], SArray SArray *pSortOrder; // orderby [optional], SArray @@ -98,25 +98,22 @@ typedef struct SQuerySqlNode { SLimitVal limit; // limit offset [optional] SLimitVal slimit; // group limit offset [optional] SStrToken sqlstr; // sql string in select clause -} SQuerySqlNode; +} SSqlNode; typedef struct STableNamePair { SStrToken name; SStrToken aliasName; } STableNamePair; -typedef struct SSubclauseInfo { // "UNION" multiple select sub-clause - SQuerySqlNode **pClause; - int32_t numOfClause; -} SSubclauseInfo; +//typedef struct SSubclauseInfo { // "UNION" multiple select sub-clause +// SSqlNode **pClause; +// int32_t numOfClause; +//} SSubclauseInfo; -typedef struct SFromInfo { - int32_t type; // nested query|table name list - union { - SSubclauseInfo pNode; - SArray *tableList; // SArray - }; -} SFromInfo; +typedef struct SRelationInfo { + int32_t type; // nested query|table name list + SArray *list; // SArray|SArray +} SRelationInfo; typedef struct SCreatedTableInfo { SStrToken name; // table name token @@ -139,7 +136,7 @@ typedef struct SCreateTableSql { } colInfo; SArray *childTableInfo; // SArray - SQuerySqlNode *pSelect; + SSqlNode *pSelect; } SCreateTableSql; typedef struct SAlterTableInfo { @@ -216,7 +213,7 @@ typedef struct SMiscInfo { typedef struct SSqlInfo { int32_t type; bool valid; - SSubclauseInfo subclauseInfo; + SArray *list; // todo refactor char msg[256]; union { SCreateTableSql *pCreateTableInfo; @@ -253,9 +250,9 @@ SArray *tVariantListAppend(SArray *pList, tVariant *pVar, uint8_t sortOrder); SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int32_t index); SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder); -SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias); -SFromInfo *setSubquery(SFromInfo* pFromInfo, SSubclauseInfo* pSqlNode); -void *destroyFromInfo(SFromInfo* pFromInfo); +SRelationInfo *setTableNameList(SRelationInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias); +SRelationInfo *setSubquery(SRelationInfo* pFromInfo, SArray* pSqlNode); +void *destroyFromInfo(SRelationInfo* pFromInfo); // sql expr leaf node tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType); @@ -270,23 +267,22 @@ void tSqlExprDestroy(tSqlExpr *pExpr); SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinct, SStrToken *pToken); void tSqlExprListDestroy(SArray *pList); -SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SFromInfo *pFrom, tSqlExpr *pWhere, +SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit); -SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SQuerySqlNode *pSelect, int32_t type); +SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSelect, int32_t type); SAlterTableInfo *tSetAlterTableInfo(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableTable); SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagNames, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists); -void destroyAllSelectClause(SSubclauseInfo *pSql); -void destroyQuerySqlNode(SQuerySqlNode *pSql); +void destroyAllSqlNode(SArray *pSqlNode); +void destroySqlNode(SSqlNode *pSql); void freeCreateTableInfo(void* p); -SSqlInfo *setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, int32_t type); -SSubclauseInfo *setSubclause(SSubclauseInfo *pClause, void *pSqlExprInfo); - -SSubclauseInfo *appendSelectClause(SSubclauseInfo *pInfo, void *pSubclause); +SSqlInfo *setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, int32_t type); +SArray *setSubclause(SArray *pList, void *pSqlNode); +SArray *appendSelectClause(SArray *pList, void *pSubclause); void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists); diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index fb1eee1d13..bf48392552 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -450,16 +450,16 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). { } //////////////////////// The SELECT statement ///////////////////////////////// -%type select {SQuerySqlNode*} -%destructor select {destroyQuerySqlNode($$);} +%type select {SSqlNode*} +%destructor select {destroySqlNode($$);} select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G); } select(A) ::= LP select(B) RP. {A = B;} -%type union {SSubclauseInfo*} -%destructor union {destroyAllSelectClause($$);} +%type union {SArray*} +%destructor union {destroyAllSqlNode($$);} union(Y) ::= select(X). { Y = setSubclause(NULL, X); } union(Y) ::= union(Z) UNION ALL select(X). { Y = appendSelectClause(Z, X); } @@ -505,11 +505,11 @@ distinct(X) ::= DISTINCT(Y). { X = Y; } distinct(X) ::= . { X.n = 0;} // A complete FROM clause. -%type from {SFromInfo*} +%type from {SRelationInfo*} from(A) ::= FROM tablelist(X). {A = X;} from(A) ::= FROM LP union(Y) RP. {A = setSubquery(NULL, Y);} -%type tablelist {SFromInfo*} +%type tablelist {SRelationInfo*} tablelist(A) ::= ids(X) cpxName(Y). { X.n += Y.n; A = setTableNameList(NULL, &X, NULL); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5d72fc7395..063eece4ff 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -201,7 +201,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO return res; } -static void* destroyOutputBuf(SSDataBlock* pBlock) { +void* destroyOutputBuf(SSDataBlock* pBlock) { if (pBlock == NULL) { return NULL; } diff --git a/src/query/src/qSqlParser.c b/src/query/src/qSqlParser.c index a3a81083f0..f51e87f404 100644 --- a/src/query/src/qSqlParser.c +++ b/src/query/src/qSqlParser.c @@ -452,13 +452,13 @@ SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int return pList; } -SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias) { - if (pFromInfo == NULL) { - pFromInfo = calloc(1, sizeof(SFromInfo)); - pFromInfo->tableList = taosArrayInit(4, sizeof(STableNamePair)); +SRelationInfo *setTableNameList(SRelationInfo* pRelationInfo, SStrToken *pName, SStrToken* pAlias) { + if (pRelationInfo == NULL) { + pRelationInfo = calloc(1, sizeof(SRelationInfo)); + pRelationInfo->list = taosArrayInit(4, sizeof(STableNamePair)); } - pFromInfo->type = SQL_NODE_FROM_NAMELIST; + pRelationInfo->type = SQL_NODE_FROM_TABLELIST; STableNamePair p = {.name = *pName}; if (pAlias != NULL) { p.aliasName = *pAlias; @@ -466,34 +466,39 @@ SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* p TPARSER_SET_NONE_TOKEN(p.aliasName); } - taosArrayPush(pFromInfo->tableList, &p); - - return pFromInfo; + taosArrayPush(pRelationInfo->list, &p); + return pRelationInfo; } -SFromInfo *setSubquery(SFromInfo* pFromInfo, SSubclauseInfo* pSqlNode) { - if (pFromInfo == NULL) { - pFromInfo = calloc(1, sizeof(SFromInfo)); +SRelationInfo* setSubquery(SRelationInfo* pRelationInfo, SArray* pList) { + if (pRelationInfo == NULL) { + pRelationInfo = calloc(1, sizeof(SRelationInfo)); + pRelationInfo->list = taosArrayInit(4, POINTER_BYTES); } - pFromInfo->type = SQL_NODE_FROM_SUBQUERY; - pFromInfo->pNode = *pSqlNode; + pRelationInfo->type = SQL_NODE_FROM_SUBQUERY; + taosArrayPush(pRelationInfo->list, &pList); - return pFromInfo; + return pRelationInfo; } -void* destroyFromInfo(SFromInfo* pFromInfo) { - if (pFromInfo == NULL) { +void* destroyFromInfo(SRelationInfo* pRelationInfo) { + if (pRelationInfo == NULL) { return NULL; } - if (pFromInfo->type == SQL_NODE_FROM_NAMELIST) { - taosArrayDestroy(pFromInfo->tableList); + if (pRelationInfo->type == SQL_NODE_FROM_TABLELIST) { + taosArrayDestroy(pRelationInfo->list); } else { - destroyAllSelectClause(&pFromInfo->pNode); + size_t size = taosArrayGetSize(pRelationInfo->list); + for(int32_t i = 0; i < size; ++i) { + SArray* pa = taosArrayGetP(pRelationInfo->list, 0); + destroyAllSqlNode(pa); + } + taosArrayDestroy(pRelationInfo->list); } - tfree(pFromInfo); + tfree(pRelationInfo); return NULL; } @@ -637,13 +642,13 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { /* * extract the select info out of sql string */ -SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SFromInfo *pFrom, tSqlExpr *pWhere, +SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *psLimit) { assert(pSelNodeList != NULL); - SQuerySqlNode *pSqlNode = calloc(1, sizeof(SQuerySqlNode)); + SSqlNode *pSqlNode = calloc(1, sizeof(SSqlNode)); // all later sql string are belonged to the stream sql pSqlNode->sqlstr = *pSelectToken; @@ -706,46 +711,47 @@ void freeCreateTableInfo(void* p) { tfree(pInfo->tagdata.data); } -void destroyQuerySqlNode(SQuerySqlNode *pQuerySql) { - if (pQuerySql == NULL) { +void destroySqlNode(SSqlNode *pSqlNode) { + if (pSqlNode == NULL) { return; } - tSqlExprListDestroy(pQuerySql->pSelNodeList); + tSqlExprListDestroy(pSqlNode->pSelNodeList); - pQuerySql->pSelNodeList = NULL; + pSqlNode->pSelNodeList = NULL; - tSqlExprDestroy(pQuerySql->pWhere); - pQuerySql->pWhere = NULL; + tSqlExprDestroy(pSqlNode->pWhere); + pSqlNode->pWhere = NULL; - taosArrayDestroyEx(pQuerySql->pSortOrder, freeVariant); - pQuerySql->pSortOrder = NULL; + taosArrayDestroyEx(pSqlNode->pSortOrder, freeVariant); + pSqlNode->pSortOrder = NULL; - taosArrayDestroyEx(pQuerySql->pGroupby, freeVariant); - pQuerySql->pGroupby = NULL; + taosArrayDestroyEx(pSqlNode->pGroupby, freeVariant); + pSqlNode->pGroupby = NULL; - pQuerySql->from = destroyFromInfo(pQuerySql->from); + pSqlNode->from = destroyFromInfo(pSqlNode->from); - taosArrayDestroyEx(pQuerySql->fillType, freeVariant); - pQuerySql->fillType = NULL; + taosArrayDestroyEx(pSqlNode->fillType, freeVariant); + pSqlNode->fillType = NULL; - free(pQuerySql); + free(pSqlNode); } -void destroyAllSelectClause(SSubclauseInfo *pClause) { - if (pClause == NULL || pClause->numOfClause == 0) { +void destroyAllSqlNode(SArray *pList) { + if (pList == NULL) { return; } - for(int32_t i = 0; i < pClause->numOfClause; ++i) { - SQuerySqlNode *pQuerySql = pClause->pClause[i]; - destroyQuerySqlNode(pQuerySql); + size_t size = taosArrayGetSize(pList); + for(int32_t i = 0; i < size; ++i) { + SSqlNode *pNode = taosArrayGetP(pList, 0); + destroySqlNode(pNode); } - - tfree(pClause->pClause); + + taosArrayDestroy(pList); } -SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SQuerySqlNode *pSelect, int32_t type) { +SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSelect, int32_t type) { SCreateTableSql *pCreate = calloc(1, sizeof(SCreateTableSql)); switch (type) { @@ -813,7 +819,7 @@ SAlterTableInfo *tSetAlterTableInfo(SStrToken *pTableName, SArray *pCols, SArray } void* destroyCreateTableSql(SCreateTableSql* pCreate) { - destroyQuerySqlNode(pCreate->pSelect); + destroySqlNode(pCreate->pSelect); taosArrayDestroy(pCreate->colInfo.pColumns); taosArrayDestroy(pCreate->colInfo.pTagColumns); @@ -828,7 +834,7 @@ void SqlInfoDestroy(SSqlInfo *pInfo) { if (pInfo == NULL) return; if (pInfo->type == TSDB_SQL_SELECT) { - destroyAllSelectClause(&pInfo->subclauseInfo); + destroyAllSqlNode(pInfo->list); } else if (pInfo->type == TSDB_SQL_CREATE_TABLE) { pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo); } else if (pInfo->type == TSDB_SQL_ALTER_TABLE) { @@ -849,31 +855,20 @@ void SqlInfoDestroy(SSqlInfo *pInfo) { } } -SSubclauseInfo* setSubclause(SSubclauseInfo* pSubclause, void *pSqlExprInfo) { - if (pSubclause == NULL) { - pSubclause = calloc(1, sizeof(SSubclauseInfo)); - } - - int32_t newSize = pSubclause->numOfClause + 1; - char* tmp = realloc(pSubclause->pClause, newSize * POINTER_BYTES); - if (tmp == NULL) { - return pSubclause; +SArray* setSubclause(SArray* pList, void *pSqlNode) { + if (pList == NULL) { + pList = taosArrayInit(1, POINTER_BYTES); } - pSubclause->pClause = (SQuerySqlNode**) tmp; - - pSubclause->pClause[newSize - 1] = pSqlExprInfo; - pSubclause->numOfClause++; - - return pSubclause; + taosArrayPush(pList, &pSqlNode); + return pList; } SSqlInfo* setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, int32_t type) { pInfo->type = type; if (type == TSDB_SQL_SELECT) { - pInfo->subclauseInfo = *(SSubclauseInfo*) pSqlExprInfo; - free(pSqlExprInfo); + pInfo->list = (SArray*) pSqlExprInfo; } else { pInfo->pCreateTableInfo = pSqlExprInfo; } @@ -885,16 +880,9 @@ SSqlInfo* setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, return pInfo; } -SSubclauseInfo* appendSelectClause(SSubclauseInfo *pQueryInfo, void *pSubclause) { - char* tmp = realloc(pQueryInfo->pClause, (pQueryInfo->numOfClause + 1) * POINTER_BYTES); - if (tmp == NULL) { // out of memory - return pQueryInfo; - } - - pQueryInfo->pClause = (SQuerySqlNode**) tmp; - pQueryInfo->pClause[pQueryInfo->numOfClause++] = pSubclause; - - return pQueryInfo; +SArray* appendSelectClause(SArray *pList, void *pSubclause) { + taosArrayPush(pList, &pSubclause); + return pList; } void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists) { diff --git a/src/query/src/sql.c b/src/query/src/sql.c index 77f836ad0f..707db50968 100644 --- a/src/query/src/sql.c +++ b/src/query/src/sql.c @@ -105,20 +105,19 @@ typedef union { ParseTOKENTYPE yy0; SCreateTableSql* yy14; int yy20; + SSqlNode* yy116; tSqlExpr* yy118; SArray* yy159; SIntervalVal yy184; SCreatedTableInfo yy206; + SRelationInfo* yy236; SSessionWindowVal yy249; - SQuerySqlNode* yy272; int64_t yy317; SCreateDbInfo yy322; SCreateAcctInfo yy351; - SSubclauseInfo* yy391; TAOS_FIELD yy407; SLimitVal yy440; tVariant yy488; - SFromInfo* yy514; } YYMINORTYPE; #ifndef YYSTACKDEPTH #define YYSTACKDEPTH 100 @@ -1426,7 +1425,7 @@ destroyCreateTableSql((yypminor->yy14)); break; case 234: /* select */ { -destroyQuerySqlNode((yypminor->yy272)); +destroySqlNode((yypminor->yy116)); } break; case 237: /* selcollist */ @@ -1446,7 +1445,7 @@ tSqlExprDestroy((yypminor->yy118)); break; case 249: /* union */ { -destroyAllSelectClause((yypminor->yy391)); +destroyAllSqlNode((yypminor->yy159)); } break; case 257: /* sortitem */ @@ -2540,7 +2539,7 @@ static void yy_reduce( break; case 141: /* create_table_args ::= ifnotexists ids cpxName AS select */ { - yylhsminor.yy14 = tSetCreateTableInfo(NULL, NULL, yymsp[0].minor.yy272, TSQL_CREATE_STREAM); + yylhsminor.yy14 = tSetCreateTableInfo(NULL, NULL, yymsp[0].minor.yy116, TSQL_CREATE_STREAM); setSqlInfo(pInfo, yylhsminor.yy14, NULL, TSDB_SQL_CREATE_TABLE); yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n; @@ -2595,29 +2594,29 @@ static void yy_reduce( break; case 156: /* select ::= SELECT selcollist from where_opt interval_opt session_option fill_opt sliding_opt groupby_opt orderby_opt having_opt slimit_opt limit_opt */ { - yylhsminor.yy272 = tSetQuerySqlNode(&yymsp[-12].minor.yy0, yymsp[-11].minor.yy159, yymsp[-10].minor.yy514, yymsp[-9].minor.yy118, yymsp[-4].minor.yy159, yymsp[-3].minor.yy159, &yymsp[-8].minor.yy184, &yymsp[-7].minor.yy249, &yymsp[-5].minor.yy0, yymsp[-6].minor.yy159, &yymsp[0].minor.yy440, &yymsp[-1].minor.yy440); + yylhsminor.yy116 = tSetQuerySqlNode(&yymsp[-12].minor.yy0, yymsp[-11].minor.yy159, yymsp[-10].minor.yy236, yymsp[-9].minor.yy118, yymsp[-4].minor.yy159, yymsp[-3].minor.yy159, &yymsp[-8].minor.yy184, &yymsp[-7].minor.yy249, &yymsp[-5].minor.yy0, yymsp[-6].minor.yy159, &yymsp[0].minor.yy440, &yymsp[-1].minor.yy440); } - yymsp[-12].minor.yy272 = yylhsminor.yy272; + yymsp[-12].minor.yy116 = yylhsminor.yy116; break; case 157: /* select ::= LP select RP */ -{yymsp[-2].minor.yy272 = yymsp[-1].minor.yy272;} +{yymsp[-2].minor.yy116 = yymsp[-1].minor.yy116;} break; case 158: /* union ::= select */ -{ yylhsminor.yy391 = setSubclause(NULL, yymsp[0].minor.yy272); } - yymsp[0].minor.yy391 = yylhsminor.yy391; +{ yylhsminor.yy159 = setSubclause(NULL, yymsp[0].minor.yy116); } + yymsp[0].minor.yy159 = yylhsminor.yy159; break; case 159: /* union ::= union UNION ALL select */ -{ yylhsminor.yy391 = appendSelectClause(yymsp[-3].minor.yy391, yymsp[0].minor.yy272); } - yymsp[-3].minor.yy391 = yylhsminor.yy391; +{ yylhsminor.yy159 = appendSelectClause(yymsp[-3].minor.yy159, yymsp[0].minor.yy116); } + yymsp[-3].minor.yy159 = yylhsminor.yy159; break; case 160: /* cmd ::= union */ -{ setSqlInfo(pInfo, yymsp[0].minor.yy391, NULL, TSDB_SQL_SELECT); } +{ setSqlInfo(pInfo, yymsp[0].minor.yy159, NULL, TSDB_SQL_SELECT); } break; case 161: /* select ::= SELECT selcollist */ { - yylhsminor.yy272 = tSetQuerySqlNode(&yymsp[-1].minor.yy0, yymsp[0].minor.yy159, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + yylhsminor.yy116 = tSetQuerySqlNode(&yymsp[-1].minor.yy0, yymsp[0].minor.yy159, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); } - yymsp[-1].minor.yy272 = yylhsminor.yy272; + yymsp[-1].minor.yy116 = yylhsminor.yy116; break; case 162: /* sclp ::= selcollist COMMA */ {yylhsminor.yy159 = yymsp[-1].minor.yy159;} @@ -2655,38 +2654,38 @@ static void yy_reduce( yymsp[0].minor.yy0 = yylhsminor.yy0; break; case 171: /* from ::= FROM tablelist */ -{yymsp[-1].minor.yy514 = yymsp[0].minor.yy514;} +{yymsp[-1].minor.yy236 = yymsp[0].minor.yy236;} break; case 172: /* from ::= FROM LP union RP */ -{yymsp[-3].minor.yy514 = setSubquery(NULL, yymsp[-1].minor.yy391);} +{yymsp[-3].minor.yy236 = setSubquery(NULL, yymsp[-1].minor.yy159);} break; case 173: /* tablelist ::= ids cpxName */ { yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n; - yylhsminor.yy514 = setTableNameList(NULL, &yymsp[-1].minor.yy0, NULL); + yylhsminor.yy236 = setTableNameList(NULL, &yymsp[-1].minor.yy0, NULL); } - yymsp[-1].minor.yy514 = yylhsminor.yy514; + yymsp[-1].minor.yy236 = yylhsminor.yy236; break; case 174: /* tablelist ::= ids cpxName ids */ { yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n; - yylhsminor.yy514 = setTableNameList(NULL, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); + yylhsminor.yy236 = setTableNameList(NULL, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); } - yymsp[-2].minor.yy514 = yylhsminor.yy514; + yymsp[-2].minor.yy236 = yylhsminor.yy236; break; case 175: /* tablelist ::= tablelist COMMA ids cpxName */ { yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n; - yylhsminor.yy514 = setTableNameList(yymsp[-3].minor.yy514, &yymsp[-1].minor.yy0, NULL); + yylhsminor.yy236 = setTableNameList(yymsp[-3].minor.yy236, &yymsp[-1].minor.yy0, NULL); } - yymsp[-3].minor.yy514 = yylhsminor.yy514; + yymsp[-3].minor.yy236 = yylhsminor.yy236; break; case 176: /* tablelist ::= tablelist COMMA ids cpxName ids */ { yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n; - yylhsminor.yy514 = setTableNameList(yymsp[-4].minor.yy514, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); + yylhsminor.yy236 = setTableNameList(yymsp[-4].minor.yy236, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); } - yymsp[-4].minor.yy514 = yylhsminor.yy514; + yymsp[-4].minor.yy236 = yylhsminor.yy236; break; case 177: /* tmvar ::= VARIABLE */ {yylhsminor.yy0 = yymsp[0].minor.yy0;} diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 41b129939b..31131f46a5 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -3414,14 +3414,16 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { size_t numOfTables = taosArrayGetSize(p); for(int32_t j = 0; j < numOfTables; ++j) { STable* pTable = taosArrayGetP(p, j); - assert(pTable != NULL); - - tsdbUnRefTable(pTable); + if (pTable != NULL) { // in case of handling retrieve data from tsdb + tsdbUnRefTable(pTable); + } + //assert(pTable != NULL); } taosArrayDestroy(p); } + taosHashCleanup(pGroupList->map); taosArrayDestroy(pGroupList->pGroupList); pGroupList->numOfTables = 0; } -- GitLab