From a33e30622c9e7c9ab9302cae69e3798c9c8e5c3c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 16 Nov 2020 18:29:27 +0800 Subject: [PATCH] [TD-225] --- src/client/inc/tscLocalMerge.h | 4 +- src/client/src/tscLocalMerge.c | 75 ++++++++++++++++------------------ src/client/src/tscSQLParser.c | 10 +++-- src/client/src/tscSubquery.c | 39 ++---------------- src/client/src/tscUtil.c | 8 ++++ src/query/src/qExecutor.c | 25 +++++++----- 6 files changed, 69 insertions(+), 92 deletions(-) diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 8a09057722..ce67344b03 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -84,7 +84,7 @@ typedef struct SRetrieveSupport { } SRetrieveSupport; int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc, - SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize); + SColumnModel **pFinalModel, uint32_t nBufferSize); void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, int32_t numOfVnodes); @@ -98,7 +98,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF * create local reducer to launch the second-stage reduce process at client site */ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, - SColumnModel *finalModel, SSqlObj* pSql); + SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql); void tscDestroyLocalReducer(SSqlObj *pSql); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 0440ef77ad..66827b964d 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -30,7 +30,7 @@ typedef struct SCompareParam { int32_t groupOrderType; } SCompareParam; -static void doArithmeticCalculate(SQueryInfo* pQueryInfo, char* pBuf, char* src, int32_t numOfRows); +static void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize); int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { int32_t pLeftIdx = *(int32_t *)pLeft; @@ -169,7 +169,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { } void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, - SColumnModel *finalmodel, SSqlObj* pSql) { + SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -505,7 +505,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo); if (pLocalReducer->pCtx != NULL) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { + int32_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < numOfExprs; ++i) { SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i]; tVariantDestroy(&pCtx->tag); @@ -569,7 +570,8 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm if (numOfGroupByCols > 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { - int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; + int32_t numOfInternalOutput = tscSqlExprNumOfExprs(pQueryInfo); + int32_t startCols = numOfInternalOutput - pQueryInfo->groupbyExpr.numOfGroupCols; // the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { @@ -655,7 +657,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage } int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc, - SColumnModel **pFinalModel, SColumnModel **pFFModel, uint32_t nBufferSizes) { + SColumnModel **pFinalModel, uint32_t nBufferSizes) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -688,6 +690,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pSchema[i].bytes = pExpr->resBytes; pSchema[i].type = (int8_t)pExpr->resType; + tstrncpy(pSchema[i].name, pExpr->aliasName, tListLen(pSchema[i].name)); + rlen += pExpr->resBytes; } @@ -751,18 +755,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr *pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity); - memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->fieldsInfo.numOfOutput); - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); - - TAOS_FIELD* pField = &pIField->field; - - pSchema[i].type = pField->type; - pSchema[i].bytes = pField->bytes; - strncpy(pSchema[i].name, pField->name, tListLen(pField->name)); - } - - *pFFModel = createColumnModel(pSchema, pQueryInfo->fieldsInfo.numOfOutput, capacity); + tfree(pSchema); return TSDB_CODE_SUCCESS; } @@ -1249,11 +1242,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur tColModelCompact(pModel, pResBuf, pModel->capacity); if (tscIsSecondStageQuery(pQueryInfo)) { - char* pbuf = calloc(1,pResBuf->num * pModel->rowSize); - - doArithmeticCalculate(pQueryInfo, pbuf, pResBuf->data, (int32_t)pResBuf->num); - memcpy(pResBuf->data, pbuf, pResBuf->num * pModel->rowSize); - free(pbuf); + doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize); } #ifdef _DEBUG_VIEW @@ -1623,11 +1612,23 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) pRes->data = pRes->pLocalReducer->pResultBuf->data; } -void doArithmeticCalculate(SQueryInfo* pQueryInfo, char* outputBuf, char* src, int32_t numOfRows) { +void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) { + char* pbuf = calloc(1, pOutput->num * rowSize); + size_t size = tscNumOfFields(pQueryInfo); - SArithmeticSupport* pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport)); + SArithmeticSupport arithSup = {0}; + + // todo refactor + arithSup.offset = 0; + arithSup.numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); + arithSup.exprList = pQueryInfo->exprList; + arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES); + + for(int32_t k = 0; k < arithSup.numOfCols; ++k) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); + arithSup.data[k] = (pOutput->data + pOutput->num* pExpr->offset); + } - int32_t rowIndex = 0; int32_t offset = 0; for (int i = 0; i < size; ++i) { @@ -1635,25 +1636,19 @@ void doArithmeticCalculate(SQueryInfo* pQueryInfo, char* outputBuf, char* src, i // calculate the result from several other columns if (pSup->pArithExprInfo != NULL) { - // todo refactor - pArithSup->offset = 0; - pArithSup->pArithExpr = pSup->pArithExprInfo; - pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); - pArithSup->exprList = pQueryInfo->exprList; - pArithSup->data = calloc(pArithSup->numOfCols, POINTER_BYTES); - - for(int32_t k = 0; k < pArithSup->numOfCols; ++k) { - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); - pArithSup->data[k] = (src + numOfRows* pExpr->offset) + rowIndex*pExpr->resBytes; - } - - tExprTreeCalcTraverse(pArithSup->pArithExpr->pExpr, numOfRows, outputBuf + numOfRows*offset, pArithSup, - TSDB_ORDER_ASC, getArithemicInputSrc); + arithSup.pArithExpr = pSup->pArithExprInfo; + tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithemicInputSrc); } else { SSqlExpr* pExpr = pSup->pSqlExpr; - memcpy(outputBuf + numOfRows * offset, pExpr->offset * numOfRows + src, pExpr->resBytes * numOfRows); + memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, pExpr->resBytes * pOutput->num); } offset += pSup->field.bytes; } + + assert(finalRowSize <= rowSize); + memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize); + + tfree(pbuf); + tfree(arithSup.data); } \ No newline at end of file diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b56603e557..b55326bbd3 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1410,10 +1410,12 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t // TODO: other error handling } END_TRY - pInfo->pArithExprInfo->base.arg[0].argBytes = (int16_t) tbufTell(&bw); - pInfo->pArithExprInfo->base.arg[0].argValue.pz = tbufGetData(&bw, true); - pInfo->pArithExprInfo->base.arg[0].argType = TSDB_DATA_TYPE_BINARY; - tbufCloseWriter(&bw); + SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base; + pFuncMsg->arg[0].argBytes = (int16_t) tbufTell(&bw); + pFuncMsg->arg[0].argValue.pz = tbufGetData(&bw, true); + pFuncMsg->arg[0].argType = TSDB_DATA_TYPE_BINARY; + +// tbufCloseWriter(&bw); // TODO there is a memory leak } return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2e21f02a66..bc522d4007 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1644,7 +1644,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tExtMemBuffer ** pMemoryBuf = NULL; tOrderDescriptor *pDesc = NULL; SColumnModel *pModel = NULL; - SColumnModel *pFFModel = NULL; pRes->qhandle = 0x1; // hack the qhandle check @@ -1663,7 +1662,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { assert(pState->numOfSub > 0); - int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFFModel, nBufferSize); + int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); if (ret != 0) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscQueueAsyncRes(pSql); @@ -1708,8 +1707,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->subqueryIndex = i; trs->pParentSql = pSql; trs->pFinalColModel = pModel; - trs->pFFColModel = pFFModel; - + SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); if (pNew == NULL) { tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1764,10 +1762,6 @@ static void tscFreeRetrieveSup(SSqlObj *pSql) { } tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport); -// int32_t index = trsupport->subqueryIndex; -// SSqlObj *pParentSql = trsupport->pParentSql; - -// assert(pSql == pParentSql->pSubs[index]); tfree(trsupport->localBuffer); tfree(trsupport); } @@ -1958,7 +1952,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); tscClearInterpInfo(pPQueryInfo); - tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, pParentSql); + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql); tscDebug("%p build loser tree completed", pParentSql); pParentSql->res.precision = pSql->res.precision; @@ -2467,33 +2461,6 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) { if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) { transferNcharData(pSql, i, pField); } - - // calculate the result from several other columns -// if (pSup->pArithExprInfo != NULL) { -// if (pRes->pArithSup == NULL) { -// pRes->pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport)); -// } -// -// pRes->pArithSup->offset = 0; -// pRes->pArithSup->pArithExpr = pSup->pArithExprInfo; -// pRes->pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); -// pRes->pArithSup->exprList = pQueryInfo->exprList; -// pRes->pArithSup->data = calloc(pRes->pArithSup->numOfCols, POINTER_BYTES); -// -// if (pRes->buffer[i] == NULL) { -// TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); -// pRes->buffer[i] = malloc(field->bytes); -// } -// -// for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) { -// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); -// pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes; -// } -// -// tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup, -// TSDB_ORDER_ASC, getArithemicInputSrc); -// pRes->tsrow[i] = (unsigned char*)pRes->buffer[i]; -// } } pRes->row++; // index increase one-step diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d93fb45dab..27824fc1ff 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -945,6 +945,14 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { if (pInfo->pArithExprInfo != NULL) { tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL); + + SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base; + for(int32_t j = 0; j < pFuncMsg->numOfParams; ++j) { + if (pFuncMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { + tfree(pFuncMsg->arg[j].argValue.pz); + } + } + tfree(pInfo->pArithExprInfo); } } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 7cc8a1c591..c4ea298005 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5336,7 +5336,7 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) } static void doSecondaryArithmeticProcess(SQuery* pQuery) { - SArithmeticSupport *pArithSup = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport)); + SArithmeticSupport arithSup = {0}; tFilePage **data = calloc(pQuery->numOfExpr2, POINTER_BYTES); for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { @@ -5344,13 +5344,13 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { data[i] = (tFilePage *)malloc(bytes * pQuery->rec.rows + sizeof(tFilePage)); } - pArithSup->offset = 0; - pArithSup->numOfCols = (int32_t)pQuery->numOfOutput; - pArithSup->exprList = pQuery->pExpr1; - pArithSup->data = calloc(pArithSup->numOfCols, POINTER_BYTES); + arithSup.offset = 0; + arithSup.numOfCols = (int32_t)pQuery->numOfOutput; + arithSup.exprList = pQuery->pExpr1; + arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES); - for (int32_t k = 0; k < pArithSup->numOfCols; ++k) { - pArithSup->data[k] = pQuery->sdata[k]->data; + for (int32_t k = 0; k < arithSup.numOfCols; ++k) { + arithSup.data[k] = pQuery->sdata[k]->data; } for (int i = 0; i < pQuery->numOfExpr2; ++i) { @@ -5368,8 +5368,8 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { } } } else { - pArithSup->pArithExpr = pExpr; - tExprTreeCalcTraverse(pArithSup->pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, pArithSup, TSDB_ORDER_ASC, + arithSup.pArithExpr = pExpr; + tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup, TSDB_ORDER_ASC, getArithemicInputSrc); } } @@ -5378,7 +5378,12 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { memcpy(pQuery->sdata[i]->data, data[i]->data, pQuery->pExpr2[i].bytes * pQuery->rec.rows); } - tfree(pArithSup); + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + tfree(data[i]); + } + + tfree(data); + tfree(arithSup.data); } /* -- GitLab