提交 a170194c 编写于 作者: B Benguang Zhao

fix: 1. initialize and destroy subState.mutex in tscAllocSqlObj and tscFreeSqlObj respectively

     2. protect destruction of pSubs etc within tscFreeSubobj with subState.mutex
上级 bc4afd54
......@@ -387,7 +387,6 @@ typedef struct SSqlObj {
SSqlRes res;
SSubqueryState subState;
pthread_mutex_t mtxSubs; // avoid double access pSubs after failure
struct SSqlObj **pSubs;
struct SSqlObj *rootObj;
......@@ -439,6 +438,8 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next;
} SSqlStream;
SSqlObj* tscAllocSqlObj();
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj);
......
......@@ -46,8 +46,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->fetchFp = fp;
pSql->rootObj = pSql;
pthread_mutex_init(&pSql->mtxSubs, NULL);
registerSqlObj(pSql);
pSql->sqlstr = calloc(1, sqlLen + 1);
......@@ -108,7 +106,7 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v
nPrintTsc("%s", sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pSql = tscAllocSqlObj();
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
......
......@@ -117,7 +117,7 @@ void writeMsgVgId(char * payload, int32_t vgId) {
SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) {
// Init
SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) {
tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -614,7 +614,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pInterSql = tscAllocSqlObj();
if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -766,7 +766,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pInterSql = tscAllocSqlObj();
if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......
......@@ -558,7 +558,7 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab
int32_t code = 0;
STableMeta* tableMeta = NULL;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) {
tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -2763,7 +2763,7 @@ static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) {
//make a dummy SSqlObj
static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) {
SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
if (pNew == NULL) {
return NULL;
}
......
......@@ -1261,8 +1261,6 @@ static void insertBatchClean(STscStmt* pStmt) {
taosHashClear(pCmd->insertParam.pTableBlockHashList);
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
}
static int insertBatchStmtExecute(STscStmt* pStmt) {
......@@ -1589,7 +1587,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
}
pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) {
free(pStmt);
......
......@@ -2806,7 +2806,7 @@ static void createHbObj(STscObj* pObj) {
return;
}
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pSql = tscAllocSqlObj();
if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp;
......@@ -3075,7 +3075,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) {
tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -3141,7 +3141,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
}
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) {
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -3278,7 +3278,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
}
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) {
tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -3389,10 +3389,7 @@ int tscRenewTableMeta(SSqlObj *pSql) {
pSql->rootObj->retryReason = pSql->retryReason;
SSqlObj *rootSql = pSql->rootObj;
pthread_mutex_lock(&rootSql->mtxSubs);
tscFreeSubobj(rootSql);
pthread_mutex_unlock(&rootSql->mtxSubs);
tfree(rootSql->pSubs);
tscResetSqlCmd(&rootSql->cmd, true, rootSql->self);
code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
......@@ -3419,7 +3416,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
if (allVgroupInfoRetrieved(pQueryInfo)) {
return TSDB_CODE_SUCCESS;
}
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
SSqlObj *pNew = tscAllocSqlObj();
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
......
......@@ -149,7 +149,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pSql = tscAllocSqlObj();
if (NULL == pSql) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReleaseRpc(pRpcObj);
......@@ -363,7 +363,7 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, int64_t*
nPrintTsc("%s", sqlstr);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1047,7 +1047,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return TSDB_CODE_TSC_DISCONNECTED;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
pSql->pTscObj = taos;
pSql->signature = pSql;
......@@ -1165,7 +1165,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
tscAllocPayload(&pSql->cmd, 1024);
pSql->pTscObj = taos;
......
......@@ -232,8 +232,6 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscFreeSqlResult(pStream->pSql);
tscFreeSubobj(pStream->pSql);
tfree(pStream->pSql->pSubs);
pStream->pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
......@@ -610,8 +608,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql);
}
......@@ -1006,7 +1002,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t
return NULL;
}
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlObj *pSql = tscAllocSqlObj();
if (pSql == NULL) {
return NULL;
}
......
......@@ -117,7 +117,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail;
}
pSql = calloc(1, sizeof(SSqlObj));
pSql = tscAllocSqlObj();
if (pSql == NULL) {
line = __LINE__;
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -432,7 +432,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
}
SSqlObj* recreateSqlObj(SSub* pSub) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) {
return NULL;
}
......
......@@ -708,10 +708,6 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL;
}
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
}
......@@ -901,7 +897,6 @@ bool tscReparseSql(SSqlObj *sql, int32_t code){
}
tscFreeSubobj(sql);
tfree(sql->pSubs);
sql->res.code = TSDB_CODE_SUCCESS;
sql->retry++;
......@@ -3014,7 +3009,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
}
tscFreeSubobj(userSql);
tfree(userSql->pSubs);
userSql->res.code = TSDB_CODE_SUCCESS;
userSql->retry++;
......
......@@ -1678,8 +1678,9 @@ void tscFreeSqlResult(SSqlObj* pSql) {
}
void tscFreeSubobj(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->subState.numOfSub == 0) {
return;
goto _out;
}
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
......@@ -1695,12 +1696,12 @@ void tscFreeSubobj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL;
}
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->pSubs);
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
_out:
pthread_mutex_unlock(&pSql->subState.mutex);
}
/**
......@@ -1737,6 +1738,15 @@ void tscFreeMetaSqlObj(int64_t *rid){
}
}
SSqlObj* tscAllocSqlObj() {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (!pNew) {
return NULL;
}
pthread_mutex_init(&pNew->subState.mutex, NULL);
return pNew;
}
void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
......@@ -1760,17 +1770,11 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeSubobj(pSql);
if (pSql && (pSql == pSql->rootObj)) {
pthread_mutex_destroy(&pSql->mtxSubs);
}
pSql->signature = NULL;
pSql->fp = NULL;
tfree(pSql->sqlstr);
tfree(pSql->pBuf);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscFreeSqlResult(pSql);
......@@ -1781,6 +1785,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql);
pthread_mutex_destroy(&pSql->subState.mutex);
tsem_destroy(&pSql->rspSem);
memset(pSql, 0, sizeof(*pSql));
free(pSql);
......@@ -3781,7 +3786,7 @@ void registerSqlObj(SSqlObj* pSql) {
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) {
tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, 0);
return NULL;
......@@ -3858,7 +3863,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t ui
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) {
tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, tableIndex);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -4161,10 +4166,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
}
tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs);
tscFreeSubobj(rootObj);
tfree(rootObj->pSubs);
rootObj->res.code = TSDB_CODE_SUCCESS;
rootObj->retry++;
......@@ -4263,7 +4265,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pSql->cmd.active = pSub;
pSql->cmd.command = TSDB_SQL_SELECT;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
......@@ -4625,7 +4627,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
pSql->fp = fp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册