diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 359530a00a71882a9558e261b7f785e77d44486e..387de3dc600f11a0924310835f0eac7359c6fdef 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -119,7 +119,7 @@ typedef struct SBlockKeyInfo { int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); -void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); +void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta); void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf); int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); @@ -130,12 +130,12 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg); SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, uint32_t offset); -void* tscDestroyBlockArrayList(SArray* pDataBlockList); +void* tscDestroyBlockArrayList(SSqlObj* pSql, SArray* pDataBlockList); void* tscDestroyUdfArrayList(SArray* pUdfList); -void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta); +void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap); +int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap); int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 1bf27e6cad1d57fdfd4b786d1cdcea981bf3333b..4e2b89d64f95a284d1e50dd1847af61ab7b6b653 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1529,7 +1529,7 @@ int tsParseInsertSql(SSqlObj *pSql) { // merge according to vgId if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) { - if ((code = tscMergeTableDataBlocks(pInsertParam, true)) != TSDB_CODE_SUCCESS) { + if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) { goto _clean; } } @@ -1635,7 +1635,7 @@ static int doPackSendDataBlock(SSqlObj* pSql, SInsertStatementParam *pInsertPara return tscInvalidOperationMsg(pInsertParam->msg, "too many rows in sql, total number of rows should be less than 32767", NULL); } - if ((code = tscMergeTableDataBlocks(pInsertParam, true)) != TSDB_CODE_SUCCESS) { + if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) { return code; } @@ -1696,7 +1696,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow SInsertStatementParam *pInsertParam = &pCmd->insertParam; destroyTableNameList(pInsertParam); - pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks); + pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pParentSql, pInsertParam->pDataBlocks); if (pInsertParam->pTableBlockHashList == NULL) { pInsertParam->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index bbddc4bff925de1a7d0b67fd233b6e2e88a618a3..f148dc8b6eb35f32d5df9a11b2ab6b4f5e4f2255 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1163,7 +1163,7 @@ static int insertStmtExecute(STscStmt* stmt) { fillTablesColumnsNull(stmt->pSql); - int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false); + int code = tscMergeTableDataBlocks(stmt->pSql, &stmt->pSql->cmd.insertParam, false); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1194,7 +1194,7 @@ static int insertStmtExecute(STscStmt* stmt) { pCmd->insertParam.numOfTables = 0; tfree(pCmd->insertParam.pTableNameList); - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); return pSql->res.code; } @@ -1215,7 +1215,7 @@ static void insertBatchClean(STscStmt* pStmt) { tfree(pCmd->insertParam.pTableNameList); - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); pCmd->insertParam.numOfTables = 0; taosHashClear(pCmd->insertParam.pTableBlockHashList); @@ -1242,7 +1242,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { fillTablesColumnsNull(pStmt->pSql); - if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { + if ((code = tscMergeTableDataBlocks(pStmt->pSql, &pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { return code; } @@ -1774,8 +1774,8 @@ int taos_stmt_close(TAOS_STMT* stmt) { if (pStmt->pSql && pStmt->pSql->res.code != 0) { rmMeta = true; } - tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta); - pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta); + tscDestroyDataBlock(pStmt->pSql, pStmt->mtb.lastBlock, rmMeta); + pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->pSql, pStmt->mtb.pTableBlockHashList, rmMeta); if (pStmt->pSql){ taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1875c5faa6f67432975ae456e94ce177f1cebdee..5e608afc8e2493758a0bc82e32161fc12ab0bcea 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3374,7 +3374,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { goto _error; } - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); // use the local variable for (int32_t j = 0; j < numOfSub; ++j) { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index a1cc09ce1c809d2da752786bda7664d6d1246d0d..a3a582f8dda575fdba0f85dbf7eaa9e6ebd0301c 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -153,9 +153,8 @@ void *tscAcquireClusterInfo(const char *clusterId) { pObj = *ppObj; } - if (pObj) { - pObj->ref += 1; - } + if (pObj) { pObj->ref += 1; } + pthread_mutex_unlock(&clusterMutex); return pObj; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index da8e53cab3befe0bfbfa5c498a92a38f9dbf18e5..5f254bdcf53de9cc78464a3d8ff532ebf11582df 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1421,6 +1421,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) { } void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { + SSqlObj *pSql = (SSqlObj*)taosAcquireRef(tscObjRef, id); pCmd->command = 0; pCmd->numOfCols = 0; pCmd->count = 0; @@ -1429,13 +1430,14 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { pCmd->insertParam.sql = NULL; destroyTableNameList(&pCmd->insertParam); - pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, clearCachedMeta); - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); + pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pSql, pCmd->insertParam.pTableBlockHashList, clearCachedMeta); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); tfree(pCmd->insertParam.tagData.data); pCmd->insertParam.tagData.dataLen = 0; tscFreeQueryInfo(pCmd, clearCachedMeta, id); pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); + taosReleaseRef(tscObjRef, id); } void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) { @@ -1566,7 +1568,7 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) { tfree(pColInfo->colIdxInfo); } -void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { +void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) { if (pDataBlock == NULL) { return; } @@ -1577,7 +1579,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pDataBlock->tableName, name); - //taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } if (!pDataBlock->cloned) { @@ -1618,7 +1620,7 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint return param; } -void* tscDestroyBlockArrayList(SArray* pDataBlockList) { +void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) { if (pDataBlockList == NULL) { return NULL; } @@ -1626,7 +1628,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { size_t size = taosArrayGetSize(pDataBlockList); for (int32_t i = 0; i < size; i++) { void* d = taosArrayGetP(pDataBlockList, i); - tscDestroyDataBlock(d, false); + tscDestroyDataBlock(pSql, d, false); } taosArrayDestroy(pDataBlockList); @@ -1674,14 +1676,14 @@ void* tscDestroyUdfArrayList(SArray* pUdfList) { -void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) { +void* tscDestroyBlockHashTable(SSqlObj *pSql, SHashObj* pBlockHashTable, bool removeMeta) { if (pBlockHashTable == NULL) { return NULL; } STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL); while(p) { - tscDestroyDataBlock(*p, removeMeta); + tscDestroyDataBlock(pSql, *p, removeMeta); p = taosHashIterate(pBlockHashTable, p); } @@ -1922,7 +1924,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) { +static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); if (pInsertParam->pTableNameList == NULL) { pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); @@ -1939,11 +1941,11 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB } if (freeBlockMap) { - pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pInsertParam->pTableBlockHashList, false); + pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false); } } -int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) { +int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType); @@ -1968,7 +1970,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl if (ret != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret); taosHashCleanup(pVnodeDataBlockHashList); - tscDestroyBlockArrayList(pVnodeDataBlockList); + tscDestroyBlockArrayList(pSql, pVnodeDataBlockList); tfree(blkKeyInfo.pKeyTuple); return ret; } @@ -1987,7 +1989,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize); taosHashCleanup(pVnodeDataBlockHashList); - tscDestroyBlockArrayList(pVnodeDataBlockList); + tscDestroyBlockArrayList(pSql, pVnodeDataBlockList); tfree(dataBuf->pData); tfree(blkKeyInfo.pKeyTuple); @@ -2005,7 +2007,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl } else { if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { taosHashCleanup(pVnodeDataBlockHashList); - tscDestroyBlockArrayList(pVnodeDataBlockList); + tscDestroyBlockArrayList(pSql, pVnodeDataBlockList); tfree(dataBuf->pData); tfree(blkKeyInfo.pKeyTuple); return code; @@ -2052,7 +2054,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl pOneTableBlock = *p; } - extractTableNameList(pInsertParam, freeBlockMap); + extractTableNameList(pSql, pInsertParam, freeBlockMap); // free the table data blocks; pInsertParam->pDataBlocks = pVnodeDataBlockList;