From 46ecd3b2b84deeac1ce011d83e8c011ed417b0de Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 10 Jan 2020 13:02:34 +0800 Subject: [PATCH] refactor codes and fix some memory leaks. --- src/client/inc/tsclient.h | 2 +- src/client/src/tscJoinProcess.c | 43 ++++++++-------- src/client/src/tscSQLParser.c | 2 +- src/client/src/tscSecondaryMerge.c | 2 +- src/client/src/tscServer.c | 2 +- src/client/src/tscSql.c | 14 +++--- src/client/src/tscUtil.c | 80 +++++++++++++++++------------- src/system/detail/src/vnodeRead.c | 8 +-- 8 files changed, 83 insertions(+), 70 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 7e27031273..047a364859 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -409,7 +409,7 @@ void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo); void tscClearSqlMetaInfoForce(SSqlCmd *pCmd); -int32_t tscCreateResPointerInfo(SQueryInfo *pQueryInfo, SSqlRes *pRes); +int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscDestroyResPointerInfo(SSqlRes *pRes); void tscFreeSqlCmdData(SSqlCmd *pCmd); diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index a5a491cb42..2daa640c13 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -53,7 +53,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor *st = INT64_MAX; *et = INT64_MIN; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SLimitVal* pLimit = &pQueryInfo->limit; int32_t order = pQueryInfo->order.order; @@ -109,18 +109,19 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor numOfInput2++; } else { - if (*st > elem1.ts) { - *st = elem1.ts; - } - - if (*et < elem1.ts) { - *et = elem1.ts; - } - - // in case of stable query, limit/offset is not applied here - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - + /* + * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the + * final results which is acquired after the secondry merge of in the client. + */ if (pLimit->offset == 0 || pQueryInfo->nAggTimeInterval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { + if (*st > elem1.ts) { + *st = elem1.ts; + } + + if (*et < elem1.ts) { + *et = elem1.ts; + } + tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); } else { @@ -157,8 +158,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor tsBufDestory(pSupporter1->pTSBuf); tsBufDestory(pSupporter2->pTSBuf); - tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks intersecting", pSql, - numOfInput1, numOfInput2, output1->numOfTotal); + tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " + "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, + numOfInput1, numOfInput2, output1->numOfTotal, *st, *et); return output1->numOfTotal; } @@ -312,8 +314,11 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid); tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); - // add the ts function for interval query if it is missing - if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS && pQueryInfo->nAggTimeInterval > 0) { + /* + * if the first column of the secondary query is not ts function, add this function. + * Because this column is required to filter with timestamp after intersecting. + */ + if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) { tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); } @@ -395,9 +400,7 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter } // update the query time range according to the join results on timestamp -static void updateQueryTimeRange(SSqlObj* pSql, int64_t st, int64_t et) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - +static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) { assert(pQueryInfo->stime <= st && pQueryInfo->etime >= et); pQueryInfo->stime = st; @@ -495,7 +498,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscTrace("%p free all sub SqlObj and quit", pParentSql); doQuitSubquery(pParentSql); } else { - updateQueryTimeRange(pParentSql, st, et); + updateQueryTimeRange(pParentQueryInfo, st, et); tscLaunchSecondSubquery(pParentSql); } } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b6ae3917bb..1c9ebc243d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1324,7 +1324,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI if (pItem->pNode->nSQLOptr == TK_ALL) { // project on all fields SColumnIndex index = COLUMN_INDEX_INITIALIZER; if (getTableIndexByName(&pItem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_INVALID_SQL; + return invalidSqlErrMsg(pQueryInfo->msg, msg0); } // all meters columns are required diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 963c439d62..0038f058dd 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -301,7 +301,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pReducer->pTempBuffer->numOfElems = 0; pReducer->pResInfo = calloc((size_t)pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(SResultInfo)); - tscCreateResPointerInfo(pQueryInfo, pRes); + tscCreateResPointerInfo(pRes, pQueryInfo); tscInitSqlContext(pCmd, pRes, pReducer, pDesc); // we change the maxCapacity of schema to denote that there is only one row in temp buffer diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9d192eb7c7..b940eb6235 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2495,7 +2495,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) { - if (tscCreateResPointerInfo(pQueryInfo, pRes) != TSDB_CODE_SUCCESS) { + if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index da96f47075..a597cc7c01 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -411,13 +411,11 @@ static void **doSetResultRowData(SSqlObj *pSql) { } else if (pField->type == TSDB_DATA_TYPE_NCHAR) { // convert unicode to native code in a temporary buffer extra one byte for terminated symbol if (pRes->buffer[num] == NULL) { - pRes->buffer[num] = malloc(pField->bytes + 1); - } else { - pRes->buffer[num] = realloc(pRes->buffer[num], pField->bytes + 1); + pRes->buffer[num] = malloc(pField->bytes + TSDB_NCHAR_SIZE); } - /* string terminated */ - memset(pRes->buffer[num], 0, pField->bytes + 1); + /* string terminated char for binary data*/ + memset(pRes->buffer[num], 0, pField->bytes + TSDB_NCHAR_SIZE); if (taosUcs4ToMbs(pRes->tsrow[i], pField->bytes, pRes->buffer[num])) { pRes->tsrow[i] = pRes->buffer[num]; @@ -471,6 +469,10 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { hasData = !allSubqueryExhausted; } else { // otherwise, in case inner join, if any subquery exhausted, query completed. for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == 0) { + continue; + } + SSqlRes * pRes1 = &pSql->pSubs[i]->res; SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); @@ -518,8 +520,6 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { } success = (doSetResultRowData(pSub) != NULL); - -// success = (pRes1->row++ < pRes1->numOfRows); } if (success) { // current row of final output has been built, return to app diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index fc175a399a..07cc3169e2 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -318,46 +318,59 @@ void tscClearSqlMetaInfoForce(SSqlCmd* pCmd) { // taosRemoveDataFromCache(tscCacheHandle, (void**)&(pCmd->pMetricMeta), true); } -int32_t tscCreateResPointerInfo(SQueryInfo* pQueryInfo, SSqlRes* pRes) { +int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (pRes->tsrow == NULL) { pRes->numOfnchar = 0; + int32_t numOfOutputCols = pQueryInfo->fieldsInfo.numOfOutputCols; - for (int32_t i = 0; i < numOfOutputCols; ++i) { TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i); if (pField->type == TSDB_DATA_TYPE_NCHAR) { pRes->numOfnchar++; } } - + pRes->tsrow = calloc(1, (POINTER_BYTES + sizeof(short)) * numOfOutputCols + POINTER_BYTES * pRes->numOfnchar); - if (pRes->tsrow == NULL) { + pRes->bytes = calloc(numOfOutputCols, sizeof(short)); + + if (pRes->numOfnchar > 0) { + pRes->buffer = calloc(POINTER_BYTES, pRes->numOfnchar); + } + + // not enough memory + if (pRes->tsrow == NULL || pRes->bytes == NULL || (pRes->buffer == NULL && pRes->numOfnchar > 0)) { + tfree(pRes->tsrow); + tfree(pRes->bytes); + tfree(pRes->buffer); + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; return pRes->code; } - - pRes->bytes = (short*)((char*)pRes->tsrow + POINTER_BYTES * numOfOutputCols); - if (pRes->numOfnchar > 0) { - pRes->buffer = (char**)((char*)pRes->bytes + sizeof(short) * numOfOutputCols); - } } return TSDB_CODE_SUCCESS; } void tscDestroyResPointerInfo(SSqlRes* pRes) { - // free all buffers containing the multibyte string - for (int i = 0; i < pRes->numOfnchar; i++) { - if (pRes->buffer[i] != NULL) { + if (pRes->buffer != NULL) { + assert(pRes->numOfnchar > 0); + // free all buffers containing the multibyte string + for (int i = 0; i < pRes->numOfnchar; i++) { tfree(pRes->buffer[i]); } + + pRes->numOfnchar = 0; } - + + tfree(pRes->pRsp); tfree(pRes->tsrow); - - pRes->numOfnchar = 0; - pRes->buffer = NULL; - pRes->bytes = NULL; + + tfree(pRes->pGroupRec); + tfree(pRes->pColumnIndex); + tfree(pRes->buffer); + tfree(pRes->bytes); + + pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free } void tscFreeSqlCmdData(SSqlCmd* pCmd) { @@ -371,7 +384,6 @@ void tscFreeSqlCmdData(SSqlCmd* pCmd) { void tscFreeResData(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; - tfree(pRes->pRsp); pRes->row = 0; pRes->rspType = 0; @@ -384,19 +396,14 @@ void tscFreeResData(SSqlObj* pSql) { pRes->numOfGroups = 0; pRes->precision = 0; - pRes->numOfnchar = 0; pRes->qhandle = 0; pRes->offset = 0; pRes->useconds = 0; - pRes->data = NULL; - tfree(pRes->pGroupRec); - tscDestroyLocalReducer(pSql); tscDestroyResPointerInfo(pRes); - tfree(pRes->pColumnIndex); } void tscFreeSqlObjPartial(SSqlObj* pSql) { @@ -436,24 +443,27 @@ void tscFreeSqlObj(SSqlObj* pSql) { pSql->signature = NULL; pSql->fp = NULL; + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; memset(pCmd->payload, 0, (size_t)pCmd->allocSize); tfree(pCmd->payload); pCmd->allocSize = 0; - if (pSql->res.buffer != NULL) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - - for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; i++) { - if (pSql->res.buffer[i] != NULL) { - tfree(pSql->res.buffer[i]); - } - } - - tfree(pSql->res.buffer); - } +// if (pRes->buffer != NULL) { +// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); +// +// for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; i++) { +// if (pRes->buffer[i] != NULL) { +// printf("===========free:%p\n", pRes->buffer[i]); +// tfree(pRes->buffer[i]); +// } +// } +// +// tfree(pRes->buffer); +// } if (pSql->fp == NULL) { tsem_destroy(&pSql->rspSem); @@ -1984,7 +1994,7 @@ void tscDoQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; void* fp = pSql->fp; - assert(pSql->res.code == TSDB_CODE_SUCCESS); + pSql->res.code = TSDB_CODE_SUCCESS; if (pCmd->command > TSDB_SQL_LOCAL) { tscProcessLocalCmd(pSql); diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 919b88cd7c..cc9f78af67 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -618,7 +618,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE bool isProjQuery = vnodeIsProjectionQuery(pSqlExprs, pQueryMsg->numOfOutputCols); // todo pass the correct error code - if (isProjQuery) { + if (isProjQuery && pQueryMsg->tsLen == 0) { pQInfo = vnodeAllocateQInfo(pQueryMsg, pMeterObj, pSqlExprs); } else { pQInfo = vnodeAllocateQInfoEx(pQueryMsg, pGroupbyExpr, pSqlExprs, pMetersObj[0]); @@ -647,7 +647,9 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE SSchedMsg schedMsg = {0}; - if (!isProjQuery) { + if (isProjQuery && pQueryMsg->tsLen == 0) { + schedMsg.fp = vnodeQueryData; + } else { if (vnodeParametersSafetyCheck(pQuery) == false) { *code = TSDB_CODE_APP_ERROR; goto _error; @@ -685,8 +687,6 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE } schedMsg.fp = vnodeSingleMeterQuery; - } else { - schedMsg.fp = vnodeQueryData; } /* -- GitLab