提交 46ecd3b2 编写于 作者: H hjxilinx

refactor codes and fix some memory leaks.

上级 14c377e3
......@@ -409,7 +409,7 @@ void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
int32_t tscCreateResPointerInfo(SQueryInfo *pQueryInfo, SSqlRes *pRes);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd);
......
......@@ -53,7 +53,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
*st = INT64_MAX;
*et = INT64_MIN;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SLimitVal* pLimit = &pQueryInfo->limit;
int32_t order = pQueryInfo->order.order;
......@@ -109,18 +109,19 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
numOfInput2++;
} else {
if (*st > elem1.ts) {
*st = elem1.ts;
}
if (*et < elem1.ts) {
*et = elem1.ts;
}
// in case of stable query, limit/offset is not applied here
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
/*
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondry merge of in the client.
*/
if (pLimit->offset == 0 || pQueryInfo->nAggTimeInterval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (*st > elem1.ts) {
*st = elem1.ts;
}
if (*et < elem1.ts) {
*et = elem1.ts;
}
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
......@@ -157,8 +158,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
tsBufDestory(pSupporter1->pTSBuf);
tsBufDestory(pSupporter2->pTSBuf);
tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks intersecting", pSql,
numOfInput1, numOfInput2, output1->numOfTotal);
tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql,
numOfInput1, numOfInput2, output1->numOfTotal, *st, *et);
return output1->numOfTotal;
}
......@@ -312,8 +314,11 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid);
tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
// add the ts function for interval query if it is missing
if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS && pQueryInfo->nAggTimeInterval > 0) {
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
}
......@@ -395,9 +400,7 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter
}
// update the query time range according to the join results on timestamp
static void updateQueryTimeRange(SSqlObj* pSql, int64_t st, int64_t et) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) {
assert(pQueryInfo->stime <= st && pQueryInfo->etime >= et);
pQueryInfo->stime = st;
......@@ -495,7 +498,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscTrace("%p free all sub SqlObj and quit", pParentSql);
doQuitSubquery(pParentSql);
} else {
updateQueryTimeRange(pParentSql, st, et);
updateQueryTimeRange(pParentQueryInfo, st, et);
tscLaunchSecondSubquery(pParentSql);
}
}
......
......@@ -1324,7 +1324,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
if (pItem->pNode->nSQLOptr == TK_ALL) { // project on all fields
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getTableIndexByName(&pItem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
return invalidSqlErrMsg(pQueryInfo->msg, msg0);
}
// all meters columns are required
......
......@@ -301,7 +301,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->pTempBuffer->numOfElems = 0;
pReducer->pResInfo = calloc((size_t)pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(SResultInfo));
tscCreateResPointerInfo(pQueryInfo, pRes);
tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pRes, pReducer, pDesc);
// we change the maxCapacity of schema to denote that there is only one row in temp buffer
......
......@@ -2495,7 +2495,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
if (tscCreateResPointerInfo(pQueryInfo, pRes) != TSDB_CODE_SUCCESS) {
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
......
......@@ -411,13 +411,11 @@ static void **doSetResultRowData(SSqlObj *pSql) {
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[num] == NULL) {
pRes->buffer[num] = malloc(pField->bytes + 1);
} else {
pRes->buffer[num] = realloc(pRes->buffer[num], pField->bytes + 1);
pRes->buffer[num] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated */
memset(pRes->buffer[num], 0, pField->bytes + 1);
/* string terminated char for binary data*/
memset(pRes->buffer[num], 0, pField->bytes + TSDB_NCHAR_SIZE);
if (taosUcs4ToMbs(pRes->tsrow[i], pField->bytes, pRes->buffer[num])) {
pRes->tsrow[i] = pRes->buffer[num];
......@@ -471,6 +469,10 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
hasData = !allSubqueryExhausted;
} else { // otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == 0) {
continue;
}
SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
......@@ -518,8 +520,6 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
}
success = (doSetResultRowData(pSub) != NULL);
// success = (pRes1->row++ < pRes1->numOfRows);
}
if (success) { // current row of final output has been built, return to app
......
......@@ -318,46 +318,59 @@ void tscClearSqlMetaInfoForce(SSqlCmd* pCmd) {
// taosRemoveDataFromCache(tscCacheHandle, (void**)&(pCmd->pMetricMeta), true);
}
int32_t tscCreateResPointerInfo(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pRes->tsrow == NULL) {
pRes->numOfnchar = 0;
int32_t numOfOutputCols = pQueryInfo->fieldsInfo.numOfOutputCols;
for (int32_t i = 0; i < numOfOutputCols; ++i) {
TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i);
if (pField->type == TSDB_DATA_TYPE_NCHAR) {
pRes->numOfnchar++;
}
}
pRes->tsrow = calloc(1, (POINTER_BYTES + sizeof(short)) * numOfOutputCols + POINTER_BYTES * pRes->numOfnchar);
if (pRes->tsrow == NULL) {
pRes->bytes = calloc(numOfOutputCols, sizeof(short));
if (pRes->numOfnchar > 0) {
pRes->buffer = calloc(POINTER_BYTES, pRes->numOfnchar);
}
// not enough memory
if (pRes->tsrow == NULL || pRes->bytes == NULL || (pRes->buffer == NULL && pRes->numOfnchar > 0)) {
tfree(pRes->tsrow);
tfree(pRes->bytes);
tfree(pRes->buffer);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code;
}
pRes->bytes = (short*)((char*)pRes->tsrow + POINTER_BYTES * numOfOutputCols);
if (pRes->numOfnchar > 0) {
pRes->buffer = (char**)((char*)pRes->bytes + sizeof(short) * numOfOutputCols);
}
}
return TSDB_CODE_SUCCESS;
}
void tscDestroyResPointerInfo(SSqlRes* pRes) {
// free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfnchar; i++) {
if (pRes->buffer[i] != NULL) {
if (pRes->buffer != NULL) {
assert(pRes->numOfnchar > 0);
// free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfnchar; i++) {
tfree(pRes->buffer[i]);
}
pRes->numOfnchar = 0;
}
tfree(pRes->pRsp);
tfree(pRes->tsrow);
pRes->numOfnchar = 0;
pRes->buffer = NULL;
pRes->bytes = NULL;
tfree(pRes->pGroupRec);
tfree(pRes->pColumnIndex);
tfree(pRes->buffer);
tfree(pRes->bytes);
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
}
void tscFreeSqlCmdData(SSqlCmd* pCmd) {
......@@ -371,7 +384,6 @@ void tscFreeSqlCmdData(SSqlCmd* pCmd) {
void tscFreeResData(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res;
tfree(pRes->pRsp);
pRes->row = 0;
pRes->rspType = 0;
......@@ -384,19 +396,14 @@ void tscFreeResData(SSqlObj* pSql) {
pRes->numOfGroups = 0;
pRes->precision = 0;
pRes->numOfnchar = 0;
pRes->qhandle = 0;
pRes->offset = 0;
pRes->useconds = 0;
pRes->data = NULL;
tfree(pRes->pGroupRec);
tscDestroyLocalReducer(pSql);
tscDestroyResPointerInfo(pRes);
tfree(pRes->pColumnIndex);
}
void tscFreeSqlObjPartial(SSqlObj* pSql) {
......@@ -436,24 +443,27 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->signature = NULL;
pSql->fp = NULL;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload);
pCmd->allocSize = 0;
if (pSql->res.buffer != NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; i++) {
if (pSql->res.buffer[i] != NULL) {
tfree(pSql->res.buffer[i]);
}
}
tfree(pSql->res.buffer);
}
// if (pRes->buffer != NULL) {
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
//
// for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; i++) {
// if (pRes->buffer[i] != NULL) {
// printf("===========free:%p\n", pRes->buffer[i]);
// tfree(pRes->buffer[i]);
// }
// }
//
// tfree(pRes->buffer);
// }
if (pSql->fp == NULL) {
tsem_destroy(&pSql->rspSem);
......@@ -1984,7 +1994,7 @@ void tscDoQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
void* fp = pSql->fp;
assert(pSql->res.code == TSDB_CODE_SUCCESS);
pSql->res.code = TSDB_CODE_SUCCESS;
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
......
......@@ -618,7 +618,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
bool isProjQuery = vnodeIsProjectionQuery(pSqlExprs, pQueryMsg->numOfOutputCols);
// todo pass the correct error code
if (isProjQuery) {
if (isProjQuery && pQueryMsg->tsLen == 0) {
pQInfo = vnodeAllocateQInfo(pQueryMsg, pMeterObj, pSqlExprs);
} else {
pQInfo = vnodeAllocateQInfoEx(pQueryMsg, pGroupbyExpr, pSqlExprs, pMetersObj[0]);
......@@ -647,7 +647,9 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
SSchedMsg schedMsg = {0};
if (!isProjQuery) {
if (isProjQuery && pQueryMsg->tsLen == 0) {
schedMsg.fp = vnodeQueryData;
} else {
if (vnodeParametersSafetyCheck(pQuery) == false) {
*code = TSDB_CODE_APP_ERROR;
goto _error;
......@@ -685,8 +687,6 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
}
schedMsg.fp = vnodeSingleMeterQuery;
} else {
schedMsg.fp = vnodeQueryData;
}
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册