提交 56cd398d 编写于 作者: B Benguang Zhao

fix: 1. initialize and destroy subState.mutex in tscAllocSqlObj and tscFreeSqlObj respectively

     2. protect destruction of pSubs etc within tscFreeSubobj with subState.mutex
上级 289585ee
......@@ -386,7 +386,6 @@ typedef struct SSqlObj {
SSqlRes res;
SSubqueryState subState;
pthread_mutex_t mtxSubs; // avoid double access pSubs after failure
struct SSqlObj **pSubs;
struct SSqlObj *rootObj;
......@@ -434,6 +433,8 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next;
} SSqlStream;
SSqlObj* tscAllocSqlObj();
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj);
......
......@@ -46,8 +46,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->fetchFp = fp;
pSql->rootObj = pSql;
pthread_mutex_init(&pSql->mtxSubs, NULL);
registerSqlObj(pSql);
pSql->sqlstr = calloc(1, sqlLen + 1);
......@@ -108,7 +106,7 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v
nPrintTsc("%s", sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pSql = tscAllocSqlObj();
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
......
......@@ -611,7 +611,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pInterSql = tscAllocSqlObj();
if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -763,7 +763,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pInterSql = tscAllocSqlObj();
if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......
......@@ -558,7 +558,7 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab
int32_t code = 0;
STableMeta* tableMeta = NULL;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) {
tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -2763,7 +2763,7 @@ static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) {
//make a dummy SSqlObj
static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) {
SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
if (pNew == NULL) {
return NULL;
}
......
......@@ -1261,8 +1261,6 @@ static void insertBatchClean(STscStmt* pStmt) {
taosHashClear(pCmd->insertParam.pTableBlockHashList);
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
}
static int insertBatchStmtExecute(STscStmt* pStmt) {
......@@ -1589,7 +1587,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
}
pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) {
free(pStmt);
......
......@@ -2708,7 +2708,7 @@ static void createHbObj(STscObj* pObj) {
return;
}
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pSql = tscAllocSqlObj();
if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp;
......@@ -2977,7 +2977,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) {
tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -3043,7 +3043,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
}
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) {
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -3180,7 +3180,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
}
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) {
tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -3291,10 +3291,7 @@ int tscRenewTableMeta(SSqlObj *pSql) {
pSql->rootObj->retryReason = pSql->retryReason;
SSqlObj *rootSql = pSql->rootObj;
pthread_mutex_lock(&rootSql->mtxSubs);
tscFreeSubobj(rootSql);
pthread_mutex_unlock(&rootSql->mtxSubs);
tfree(rootSql->pSubs);
tscResetSqlCmd(&rootSql->cmd, true, rootSql->self);
code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
......@@ -3321,7 +3318,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
if (allVgroupInfoRetrieved(pQueryInfo)) {
return TSDB_CODE_SUCCESS;
}
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
......
......@@ -143,7 +143,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pSql = tscAllocSqlObj();
if (NULL == pSql) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReleaseRpc(pRpcObj);
......@@ -346,7 +346,7 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, int64_t*
nPrintTsc("%s", sqlstr);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -923,7 +923,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return TSDB_CODE_TSC_DISCONNECTED;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
pSql->pTscObj = taos;
pSql->signature = pSql;
......@@ -1041,7 +1041,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
tscAllocPayload(&pSql->cmd, 1024);
pSql->pTscObj = taos;
......
......@@ -208,8 +208,6 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->parseRetry = 0;
int32_t code = tsParseSql(pSql, true);
......@@ -332,8 +330,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql);
}
......@@ -677,7 +673,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return NULL;
}
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pSql = tscAllocSqlObj();
if (pSql == NULL) {
return NULL;
}
......
......@@ -117,7 +117,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail;
}
pSql = calloc(1, sizeof(SSqlObj));
pSql = tscAllocSqlObj();
if (pSql == NULL) {
line = __LINE__;
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -432,7 +432,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
}
SSqlObj* recreateSqlObj(SSub* pSub) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) {
return NULL;
}
......
......@@ -708,10 +708,6 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL;
}
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
}
......@@ -901,7 +897,6 @@ bool tscReparseSql(SSqlObj *sql, int32_t code){
}
tscFreeSubobj(sql);
tfree(sql->pSubs);
sql->res.code = TSDB_CODE_SUCCESS;
sql->retry++;
......@@ -3016,7 +3011,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
}
tscFreeSubobj(userSql);
tfree(userSql->pSubs);
userSql->res.code = TSDB_CODE_SUCCESS;
userSql->retry++;
......
......@@ -1656,8 +1656,9 @@ void tscFreeSqlResult(SSqlObj* pSql) {
}
void tscFreeSubobj(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->subState.numOfSub == 0) {
return;
goto _out;
}
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
......@@ -1673,12 +1674,12 @@ void tscFreeSubobj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL;
}
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->pSubs);
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
_out:
pthread_mutex_unlock(&pSql->subState.mutex);
}
/**
......@@ -1715,6 +1716,15 @@ void tscFreeMetaSqlObj(int64_t *rid){
}
}
SSqlObj* tscAllocSqlObj() {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (!pNew) {
return NULL;
}
pthread_mutex_init(&pNew->subState.mutex, NULL);
return pNew;
}
void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
......@@ -1738,17 +1748,11 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeSubobj(pSql);
if (pSql && (pSql == pSql->rootObj)) {
pthread_mutex_destroy(&pSql->mtxSubs);
}
pSql->signature = NULL;
pSql->fp = NULL;
tfree(pSql->sqlstr);
tfree(pSql->pBuf);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscFreeSqlResult(pSql);
......@@ -1759,6 +1763,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql);
pthread_mutex_destroy(&pSql->subState.mutex);
tsem_destroy(&pSql->rspSem);
memset(pSql, 0, sizeof(*pSql));
free(pSql);
......@@ -3744,7 +3749,7 @@ void registerSqlObj(SSqlObj* pSql) {
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) {
tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, 0);
return NULL;
......@@ -3821,7 +3826,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t ui
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) {
tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, tableIndex);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -4124,10 +4129,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
}
tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs);
tscFreeSubobj(rootObj);
tfree(rootObj->pSubs);
rootObj->res.code = TSDB_CODE_SUCCESS;
rootObj->retry++;
......@@ -4220,7 +4222,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pSql->cmd.active = pSub;
pSql->cmd.command = TSDB_SQL_SELECT;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
......@@ -4573,7 +4575,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
pSql->fp = fp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册