From 580448e381aa21062a15f33df88b11c6d1ad4be8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Aug 2020 16:41:16 +0800 Subject: [PATCH] [td-621] --- src/client/inc/tscUtil.h | 4 +-- src/client/src/tscAsync.c | 36 ++++++------------- src/client/src/tscLocal.c | 65 +++++++++++++++++++++++----------- src/client/src/tscLocalMerge.c | 7 ++-- src/client/src/tscServer.c | 16 +++------ src/client/src/tscSubquery.c | 50 ++++++++++++++++++++++---- src/client/src/tscUtil.c | 39 ++++++++++++-------- 7 files changed, 134 insertions(+), 83 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index f77897a74b..6bdc2c86ae 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -186,7 +186,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); -void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); +int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); void tscSqlExprInfoDestroy(SArray* pExprInfo); SColumn* tscColumnClone(const SColumn* src); @@ -204,7 +204,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid); void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw); -void tscTagCondCopy(STagCond* dest, const STagCond* src); +int32_t tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondRelease(STagCond* pCond); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 4643d255dc..41aa122160 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -50,7 +50,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); - tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY); + pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscQueueAsyncRes(pSql); return; } @@ -94,7 +95,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { tscError("failed to malloc sqlObj"); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY); return; } @@ -191,7 +191,7 @@ void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRo tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy); } -void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) { +void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); @@ -209,6 +209,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi if (pRes->qhandle == 0) { tscError("qhandle is NULL"); pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; + pSql->param = param; + tscQueueAsyncRes(pSql); return; } @@ -269,7 +271,10 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), if (pRes->qhandle == 0) { tscError("qhandle is NULL"); - tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE); + pSql->param = param; + pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; + + tscQueueAsyncRes(pSql); return; } @@ -352,36 +357,17 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { void tscProcessAsyncRes(SSchedMsg *pMsg) { SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; -// SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - -// void *taosres = pSql; - - // pCmd may be released, so cache pCmd->command -// int cmd = pCmd->command; -// int code = pRes->code; - - // in case of async insert, restore the user specified callback function -// bool shouldFree = tscShouldBeFreed(pSql); - -// if (pCmd->command == TSDB_SQL_INSERT) { -// assert(pSql->fp != NULL); assert(pSql->fp != NULL && pSql->fetchFp != NULL); -// } -// if (pSql->fp) { pSql->fp = pSql->fetchFp; (*pSql->fp)(pSql->param, pSql, pRes->code); -// } - -// if (shouldFree) { -// tscDebug("%p sqlObj is automatically freed in async res", pSql); -// tscFreeSqlObj(pSql); -// } } +// this function will be executed by queue task threads, so the terrno is not valid static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; + terrno = *(int32_t*) pMsg->msg; (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg); } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index caaaa5bc18..b240d357a8 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -274,7 +274,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) { return tscSetValueToResObj(pSql, rowLen); } -static void tscProcessCurrentUser(SSqlObj *pSql) { +static int32_t tscProcessCurrentUser(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); @@ -282,14 +282,20 @@ static void tscProcessCurrentUser(SSqlObj *pSql) { pExpr->resType = TSDB_DATA_TYPE_BINARY; char* vx = calloc(1, pExpr->resBytes); + if (vx == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + size_t size = sizeof(pSql->pTscObj->user); STR_WITH_MAXSIZE_TO_VARSTR(vx, pSql->pTscObj->user, size); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); free(vx); + + return TSDB_CODE_SUCCESS; } -static void tscProcessCurrentDB(SSqlObj *pSql) { +static int32_t tscProcessCurrentDB(SSqlObj *pSql) { char db[TSDB_DB_NAME_LEN] = {0}; extractDBName(pSql->pTscObj->db, db); @@ -302,6 +308,10 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { pExpr->resBytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE; char* vx = calloc(1, pExpr->resBytes); + if (vx == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + if (t == 0) { setVardataNull(vx, TSDB_DATA_TYPE_BINARY); } else { @@ -310,9 +320,11 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); free(vx); + + return TSDB_CODE_SUCCESS; } -static void tscProcessServerVer(SSqlObj *pSql) { +static int32_t tscProcessServerVer(SSqlObj *pSql) { const char* v = pSql->pTscObj->sversion; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -323,13 +335,18 @@ static void tscProcessServerVer(SSqlObj *pSql) { pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE); char* vx = calloc(1, pExpr->resBytes); + if (vx == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + STR_WITH_SIZE_TO_VARSTR(vx, v, (VarDataLenT)t); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); - taosTFree(vx); + free(vx); + return TSDB_CODE_SUCCESS; } -static void tscProcessClientVer(SSqlObj *pSql) { +static int32_t tscProcessClientVer(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); @@ -339,23 +356,28 @@ static void tscProcessClientVer(SSqlObj *pSql) { pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE); char* v = calloc(1, pExpr->resBytes); + if (v == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + STR_WITH_SIZE_TO_VARSTR(v, version, (VarDataLenT)t); tscSetLocalQueryResult(pSql, v, pExpr->aliasName, pExpr->resType, pExpr->resBytes); - taosTFree(v); + free(v); + return TSDB_CODE_SUCCESS; } -static void tscProcessServStatus(SSqlObj *pSql) { +static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; if (pObj->pHb != NULL) { if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - return; + return pSql->res.code; } } else { if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - return; + return pSql->res.code; } } @@ -364,6 +386,7 @@ static void tscProcessServStatus(SSqlObj *pSql) { SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); int32_t val = 1; tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t)); + return TSDB_CODE_SUCCESS; } void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength) { @@ -393,37 +416,39 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa int tscProcessLocalCmd(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; if (pCmd->command == TSDB_SQL_CFG_LOCAL) { - pSql->res.code = (uint8_t)taosCfgDynamicOptions(pCmd->payload); + pRes->code = (uint8_t)taosCfgDynamicOptions(pCmd->payload); } else if (pCmd->command == TSDB_SQL_DESCRIBE_TABLE) { - pSql->res.code = (uint8_t)tscProcessDescribeTable(pSql); + pRes->code = (uint8_t)tscProcessDescribeTable(pSql); } else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { /* * set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to * free allocated resources and remove the SqlObj from sql query linked list */ - pSql->res.qhandle = 0x1; - pSql->res.numOfRows = 0; + pRes->qhandle = 0x1; + pRes->numOfRows = 0; } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { taosCacheEmpty(tscCacheHandle); + pRes->code = TSDB_CODE_SUCCESS; } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { - tscProcessServerVer(pSql); + pRes->code = tscProcessServerVer(pSql); } else if (pCmd->command == TSDB_SQL_CLI_VERSION) { - tscProcessClientVer(pSql); + pRes->code = tscProcessClientVer(pSql); } else if (pCmd->command == TSDB_SQL_CURRENT_USER) { - tscProcessCurrentUser(pSql); + pRes->code = tscProcessCurrentUser(pSql); } else if (pCmd->command == TSDB_SQL_CURRENT_DB) { - tscProcessCurrentDB(pSql); + pRes->code = tscProcessCurrentDB(pSql); } else if (pCmd->command == TSDB_SQL_SERV_STATUS) { - tscProcessServStatus(pSql); + pRes->code = tscProcessServStatus(pSql); } else { - pSql->res.code = TSDB_CODE_TSC_INVALID_SQL; + pRes->code = TSDB_CODE_TSC_INVALID_SQL; tscError("%p not support command:%d", pSql, pCmd->command); } // keep the code in local variable in order to avoid invalid read in case of async query - int32_t code = pSql->res.code; + int32_t code = pRes->code; if (code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, code); } else { diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index cabf2a6a11..759c08532a 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -67,8 +67,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc SQLFunctionCtx *pCtx = &pReducer->pCtx[i]; SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i); - pCtx->aOutputBuf = - pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity; + pCtx->aOutputBuf = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity; pCtx->order = pQueryInfo->order.order; pCtx->functionId = pExpr->functionId; @@ -160,7 +159,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pMemBuffer == NULL) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); - tscError("%p pMemBuffer is NULL", pMemBuffer); pRes->code = TSDB_CODE_TSC_APP_ERROR; return; @@ -168,7 +166,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pDesc->pColumnModel == NULL) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); - tscError("%p no local buffer or intermediate result format model", pSql); pRes->code = TSDB_CODE_TSC_APP_ERROR; return; @@ -188,7 +185,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (numOfFlush == 0 || numOfBuffer == 0) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscDebug("%p retrieved no data", pSql); - return; } @@ -279,6 +275,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd taosTFree(pReducer); return; } + param->pLocalData = pReducer->pLocalDataSrc; param->pDesc = pReducer->pDesc; param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ecb85472fc..6dcc7086b0 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -226,17 +226,13 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = &pSql->pRpcCtx, .code = 0 }; + // NOTE: the rpc context should be acquired before sending data to server. // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - if (pObj != NULL && pObj->signature == pObj) { - rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); - return TSDB_CODE_SUCCESS; - } else { - //pObj->signature has been reset by other thread, ignore concurrency problem - return TSDB_CODE_TSC_CONN_KILLED; - } + rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); + return TSDB_CODE_SUCCESS; } void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { @@ -1495,8 +1491,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char *tmpData = NULL; uint32_t len = pSql->cmd.payloadLen; if (len > 0) { - tmpData = calloc(1, len); - if (NULL == tmpData) { + if ((tmpData = calloc(1, len)) == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1541,8 +1536,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // copy payload content to temp buff char *tmpData = 0; if (pCmd->payloadLen > 0) { - tmpData = calloc(1, pCmd->payloadLen + 1); - if (NULL == tmpData) return -1; + if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1; memcpy(tmpData, pCmd->payload, pCmd->payloadLen); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2fb264c756..b1deeffced 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -570,8 +570,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); - *s1 = taosArrayInit(p1->num, p1->tagSize); - *s2 = taosArrayInit(p2->num, p2->tagSize); + // int16_t for padding + *s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t)); + *s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t)); if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) { return TSDB_CODE_QRY_DUP_JOIN_KEY; @@ -1039,6 +1040,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs); + if (pRes->pColumnIndex == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + return; + } for (int32_t i = 0; i < numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); @@ -1153,6 +1158,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); +// TODO int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -1199,7 +1205,9 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter // this data needs to be transfer to support struct memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); - tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);//pNewQueryInfo->tagCond; + if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } pNew->cmd.numOfCols = 0; pNewQueryInfo->intervalTime = 0; @@ -1380,7 +1388,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { const uint32_t nBufferSize = (1u << 16); // 64KB - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; @@ -1395,9 +1403,20 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { } pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); - + tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); + + if (pSql->pSubs == NULL || pState == NULL) { + taosTFree(pState); + taosTFree(pSql->pSubs); + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + + tscQueueAsyncRes(pSql); + return ret; + } + pState->numOfTotal = pSql->numOfSubs; pState->numOfRemain = pSql->numOfSubs; @@ -2029,8 +2048,21 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows)); } + if (numOfRes == 0) { + return; + } + int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList); - pRes->pRsp = realloc(pRes->pRsp, numOfRes * totalSize); + + assert(numOfRes * totalSize > 0); + char* tmp = realloc(pRes->pRsp, numOfRes * totalSize); + if (tmp == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + return; + } else { + pRes->pRsp = tmp; + } + pRes->data = pRes->pRsp; char* data = pRes->data; @@ -2069,6 +2101,12 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { pRes->buffer = calloc(numOfExprs, POINTER_BYTES); pRes->length = calloc(numOfExprs, sizeof(int32_t)); + if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscQueueAsyncRes(pSql); + return; + } + tscRestoreSQLFuncForSTableQuery(pQueryInfo); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b61fd7e8c9..7b09ef5902 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -254,15 +254,12 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { pRes->numOfCols = numOfOutput; pRes->tsrow = calloc(numOfOutput, POINTER_BYTES); - pRes->length = calloc(numOfOutput, sizeof(int32_t)); // todo refactor + pRes->length = calloc(numOfOutput, sizeof(int32_t)); pRes->buffer = calloc(numOfOutput, POINTER_BYTES); // not enough memory if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) { taosTFree(pRes->tsrow); - taosTFree(pRes->buffer); - taosTFree(pRes->length); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; return pRes->code; } @@ -281,13 +278,14 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { } taosTFree(pRes->pRsp); + taosTFree(pRes->tsrow); taosTFree(pRes->length); - + taosTFree(pRes->buffer); + taosTFree(pRes->pGroupRec); taosTFree(pRes->pColumnIndex); - taosTFree(pRes->buffer); - + if (pRes->pArithSup != NULL) { taosTFree(pRes->pArithSup->data); taosTFree(pRes->pArithSup); @@ -1052,7 +1050,7 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo) { taosArrayDestroy(pExprInfo); } -void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) { +int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) { assert(src != NULL && dst != NULL); size_t size = taosArrayGetSize(src); @@ -1064,7 +1062,7 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) if (deepcopy) { SSqlExpr* p1 = calloc(1, sizeof(SSqlExpr)); if (p1 == NULL) { - assert(0); + return -1; } *p1 = *pExpr; @@ -1078,6 +1076,8 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) } } } + + return 0; } SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { @@ -1324,11 +1324,14 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t return false; } -void tscTagCondCopy(STagCond* dest, const STagCond* src) { +int32_t tscTagCondCopy(STagCond* dest, const STagCond* src) { memset(dest, 0, sizeof(STagCond)); if (src->tbnameCond.cond != NULL) { dest->tbnameCond.cond = strdup(src->tbnameCond.cond); + if (dest->tbnameCond.cond == NULL) { + return -1; + } } dest->tbnameCond.uid = src->tbnameCond.uid; @@ -1337,7 +1340,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { dest->relType = src->relType; if (src->pCond == NULL) { - return; + return 0; } size_t s = taosArrayGetSize(src->pCond); @@ -1354,7 +1357,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { assert(pCond->cond != NULL); c.cond = malloc(c.len); if (c.cond == NULL) { - assert(0); + return -1; } memcpy(c.cond, pCond->cond, c.len); @@ -1362,6 +1365,8 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { taosArrayPush(dest->pCond, &c); } + + return 0; } void tscTagCondRelease(STagCond* pTagCond) { @@ -1854,7 +1859,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } } - tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); + if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } if (pQueryInfo->fillType != TSDB_FILL_NONE) { pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); @@ -1883,7 +1891,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; - tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true); + if (tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true) != 0) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } doSetSqlExprAndResultFieldInfo(pQueryInfo, pNewQueryInfo, uid); -- GitLab