提交 8a11875e 编写于 作者: dengyihao's avatar dengyihao

[TD-6127]<feature> one taos driver connect two cluster

上级 c803ebd8
...@@ -119,7 +119,7 @@ typedef struct SBlockKeyInfo { ...@@ -119,7 +119,7 @@ typedef struct SBlockKeyInfo {
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf); void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
...@@ -130,12 +130,12 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg); ...@@ -130,12 +130,12 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset); uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList); void* tscDestroyBlockArrayList(SSqlObj* pSql, SArray* pDataBlockList);
void* tscDestroyUdfArrayList(SArray* pUdfList); void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta); void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap); int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta, int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList); STableDataBlocks** dataBlocks, SArray* pBlockList);
......
...@@ -1529,7 +1529,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1529,7 +1529,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
// merge according to vgId // merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) { if (!TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pInsertParam->pTableBlockHashList) > 0) {
if ((code = tscMergeTableDataBlocks(pInsertParam, true)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) {
goto _clean; goto _clean;
} }
} }
...@@ -1635,7 +1635,7 @@ static int doPackSendDataBlock(SSqlObj* pSql, SInsertStatementParam *pInsertPara ...@@ -1635,7 +1635,7 @@ static int doPackSendDataBlock(SSqlObj* pSql, SInsertStatementParam *pInsertPara
return tscInvalidOperationMsg(pInsertParam->msg, "too many rows in sql, total number of rows should be less than 32767", NULL); return tscInvalidOperationMsg(pInsertParam->msg, "too many rows in sql, total number of rows should be less than 32767", NULL);
} }
if ((code = tscMergeTableDataBlocks(pInsertParam, true)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql, pInsertParam, true)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1696,7 +1696,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1696,7 +1696,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
SInsertStatementParam *pInsertParam = &pCmd->insertParam; SInsertStatementParam *pInsertParam = &pCmd->insertParam;
destroyTableNameList(pInsertParam); destroyTableNameList(pInsertParam);
pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks); pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pParentSql, pInsertParam->pDataBlocks);
if (pInsertParam->pTableBlockHashList == NULL) { if (pInsertParam->pTableBlockHashList == NULL) {
pInsertParam->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); pInsertParam->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
......
...@@ -1163,7 +1163,7 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -1163,7 +1163,7 @@ static int insertStmtExecute(STscStmt* stmt) {
fillTablesColumnsNull(stmt->pSql); fillTablesColumnsNull(stmt->pSql);
int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false); int code = tscMergeTableDataBlocks(stmt->pSql, &stmt->pSql->cmd.insertParam, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1194,7 +1194,7 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -1194,7 +1194,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pCmd->insertParam.numOfTables = 0; pCmd->insertParam.numOfTables = 0;
tfree(pCmd->insertParam.pTableNameList); tfree(pCmd->insertParam.pTableNameList);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
return pSql->res.code; return pSql->res.code;
} }
...@@ -1215,7 +1215,7 @@ static void insertBatchClean(STscStmt* pStmt) { ...@@ -1215,7 +1215,7 @@ static void insertBatchClean(STscStmt* pStmt) {
tfree(pCmd->insertParam.pTableNameList); tfree(pCmd->insertParam.pTableNameList);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
pCmd->insertParam.numOfTables = 0; pCmd->insertParam.numOfTables = 0;
taosHashClear(pCmd->insertParam.pTableBlockHashList); taosHashClear(pCmd->insertParam.pTableBlockHashList);
...@@ -1242,7 +1242,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { ...@@ -1242,7 +1242,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
fillTablesColumnsNull(pStmt->pSql); fillTablesColumnsNull(pStmt->pSql);
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pStmt->pSql, &pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1774,8 +1774,8 @@ int taos_stmt_close(TAOS_STMT* stmt) { ...@@ -1774,8 +1774,8 @@ int taos_stmt_close(TAOS_STMT* stmt) {
if (pStmt->pSql && pStmt->pSql->res.code != 0) { if (pStmt->pSql && pStmt->pSql->res.code != 0) {
rmMeta = true; rmMeta = true;
} }
tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta); tscDestroyDataBlock(pStmt->pSql, pStmt->mtb.lastBlock, rmMeta);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta); pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->pSql, pStmt->mtb.pTableBlockHashList, rmMeta);
if (pStmt->pSql){ if (pStmt->pSql){
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
} }
......
...@@ -3374,7 +3374,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3374,7 +3374,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
goto _error; goto _error;
} }
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
// use the local variable // use the local variable
for (int32_t j = 0; j < numOfSub; ++j) { for (int32_t j = 0; j < numOfSub; ++j) {
......
...@@ -153,9 +153,8 @@ void *tscAcquireClusterInfo(const char *clusterId) { ...@@ -153,9 +153,8 @@ void *tscAcquireClusterInfo(const char *clusterId) {
pObj = *ppObj; pObj = *ppObj;
} }
if (pObj) { if (pObj) { pObj->ref += 1; }
pObj->ref += 1;
}
pthread_mutex_unlock(&clusterMutex); pthread_mutex_unlock(&clusterMutex);
return pObj; return pObj;
} }
......
...@@ -1421,6 +1421,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) { ...@@ -1421,6 +1421,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) {
} }
void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) {
SSqlObj *pSql = (SSqlObj*)taosAcquireRef(tscObjRef, id);
pCmd->command = 0; pCmd->command = 0;
pCmd->numOfCols = 0; pCmd->numOfCols = 0;
pCmd->count = 0; pCmd->count = 0;
...@@ -1429,13 +1430,14 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { ...@@ -1429,13 +1430,14 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) {
pCmd->insertParam.sql = NULL; pCmd->insertParam.sql = NULL;
destroyTableNameList(&pCmd->insertParam); destroyTableNameList(&pCmd->insertParam);
pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, clearCachedMeta); pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pSql, pCmd->insertParam.pTableBlockHashList, clearCachedMeta);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
tfree(pCmd->insertParam.tagData.data); tfree(pCmd->insertParam.tagData.data);
pCmd->insertParam.tagData.dataLen = 0; pCmd->insertParam.tagData.dataLen = 0;
tscFreeQueryInfo(pCmd, clearCachedMeta, id); tscFreeQueryInfo(pCmd, clearCachedMeta, id);
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
taosReleaseRef(tscObjRef, id);
} }
void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) { void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) {
...@@ -1566,7 +1568,7 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) { ...@@ -1566,7 +1568,7 @@ void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->colIdxInfo); tfree(pColInfo->colIdxInfo);
} }
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
} }
...@@ -1577,7 +1579,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { ...@@ -1577,7 +1579,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name); tNameExtractFullName(&pDataBlock->tableName, name);
//taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(UTIL_GET_TABLEMETA(pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
if (!pDataBlock->cloned) { if (!pDataBlock->cloned) {
...@@ -1618,7 +1620,7 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint ...@@ -1618,7 +1620,7 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
return param; return param;
} }
void* tscDestroyBlockArrayList(SArray* pDataBlockList) { void* tscDestroyBlockArrayList(SSqlObj *pSql, SArray* pDataBlockList) {
if (pDataBlockList == NULL) { if (pDataBlockList == NULL) {
return NULL; return NULL;
} }
...@@ -1626,7 +1628,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { ...@@ -1626,7 +1628,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
size_t size = taosArrayGetSize(pDataBlockList); size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
void* d = taosArrayGetP(pDataBlockList, i); void* d = taosArrayGetP(pDataBlockList, i);
tscDestroyDataBlock(d, false); tscDestroyDataBlock(pSql, d, false);
} }
taosArrayDestroy(pDataBlockList); taosArrayDestroy(pDataBlockList);
...@@ -1674,14 +1676,14 @@ void* tscDestroyUdfArrayList(SArray* pUdfList) { ...@@ -1674,14 +1676,14 @@ void* tscDestroyUdfArrayList(SArray* pUdfList) {
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) { void* tscDestroyBlockHashTable(SSqlObj *pSql, SHashObj* pBlockHashTable, bool removeMeta) {
if (pBlockHashTable == NULL) { if (pBlockHashTable == NULL) {
return NULL; return NULL;
} }
STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL); STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL);
while(p) { while(p) {
tscDestroyDataBlock(*p, removeMeta); tscDestroyDataBlock(pSql, *p, removeMeta);
p = taosHashIterate(pBlockHashTable, p); p = taosHashIterate(pBlockHashTable, p);
} }
...@@ -1922,7 +1924,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { ...@@ -1922,7 +1924,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result; return result;
} }
static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) { static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList == NULL) { if (pInsertParam->pTableNameList == NULL) {
pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
...@@ -1939,11 +1941,11 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB ...@@ -1939,11 +1941,11 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB
} }
if (freeBlockMap) { if (freeBlockMap) {
pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pInsertParam->pTableBlockHashList, false); pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false);
} }
} }
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) { int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0; int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType); bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType);
...@@ -1968,7 +1970,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -1968,7 +1970,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret); tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList); tscDestroyBlockArrayList(pSql, pVnodeDataBlockList);
tfree(blkKeyInfo.pKeyTuple); tfree(blkKeyInfo.pKeyTuple);
return ret; return ret;
} }
...@@ -1987,7 +1989,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -1987,7 +1989,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize); tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList); tscDestroyBlockArrayList(pSql, pVnodeDataBlockList);
tfree(dataBuf->pData); tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple); tfree(blkKeyInfo.pKeyTuple);
...@@ -2005,7 +2007,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -2005,7 +2007,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
} else { } else {
if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) { if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList); tscDestroyBlockArrayList(pSql, pVnodeDataBlockList);
tfree(dataBuf->pData); tfree(dataBuf->pData);
tfree(blkKeyInfo.pKeyTuple); tfree(blkKeyInfo.pKeyTuple);
return code; return code;
...@@ -2052,7 +2054,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -2052,7 +2054,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
pOneTableBlock = *p; pOneTableBlock = *p;
} }
extractTableNameList(pInsertParam, freeBlockMap); extractTableNameList(pSql, pInsertParam, freeBlockMap);
// free the table data blocks; // free the table data blocks;
pInsertParam->pDataBlocks = pVnodeDataBlockList; pInsertParam->pDataBlocks = pVnodeDataBlockList;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册