未验证 提交 b538d580 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6923 from taosdata/feature/TD-5331

[TD-5331]optimize bind insert performance
...@@ -110,6 +110,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -110,6 +110,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf); void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo); void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
void doRetrieveSubqueryData(SSchedMsg *pMsg); void doRetrieveSubqueryData(SSchedMsg *pMsg);
......
...@@ -138,6 +138,7 @@ typedef struct STableDataBlocks { ...@@ -138,6 +138,7 @@ typedef struct STableDataBlocks {
uint32_t size; uint32_t size;
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData; char *pData;
bool cloned;
SParsedDataColInfo boundColumnInfo; SParsedDataColInfo boundColumnInfo;
......
...@@ -1056,7 +1056,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 ...@@ -1056,7 +1056,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { int32_t FORCE_INLINE tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->tid = pTableMeta->id.tid; pBlocks->tid = pTableMeta->id.tid;
pBlocks->uid = pTableMeta->id.uid; pBlocks->uid = pTableMeta->id.uid;
pBlocks->sversion = pTableMeta->sversion; pBlocks->sversion = pTableMeta->sversion;
......
...@@ -47,6 +47,7 @@ typedef struct SNormalStmt { ...@@ -47,6 +47,7 @@ typedef struct SNormalStmt {
typedef struct SMultiTbStmt { typedef struct SMultiTbStmt {
bool nameSet; bool nameSet;
bool tagSet; bool tagSet;
bool subSet;
uint64_t currentUid; uint64_t currentUid;
char *sqlstr; char *sqlstr;
uint32_t tbNum; uint32_t tbNum;
...@@ -54,6 +55,7 @@ typedef struct SMultiTbStmt { ...@@ -54,6 +55,7 @@ typedef struct SMultiTbStmt {
SStrToken stbname; SStrToken stbname;
SStrToken values; SStrToken values;
SArray *tags; SArray *tags;
STableDataBlocks *lastBlock;
SHashObj *pTableHash; SHashObj *pTableHash;
SHashObj *pTableBlockHashList; // data block for each table SHashObj *pTableBlockHashList; // data block for each table
} SMultiTbStmt; } SMultiTbStmt;
...@@ -347,7 +349,7 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) { ...@@ -347,7 +349,7 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation // functions for insertion statement preparation
static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { static FORCE_INLINE int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
if (bind->is_null != NULL && *(bind->is_null)) { if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data + param->offset, param->type, param->bytes); setNull(data + param->offset, param->type, param->bytes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -746,25 +748,25 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, ...@@ -746,25 +748,25 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:
size = 1; *(uint8_t *)(data + param->offset) = *(uint8_t *)bind->buffer;
break; break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_USMALLINT:
size = 2; *(uint16_t *)(data + param->offset) = *(uint16_t *)bind->buffer;
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
size = 4; *(uint32_t *)(data + param->offset) = *(uint32_t *)bind->buffer;
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
size = 8; *(uint64_t *)(data + param->offset) = *(uint64_t *)bind->buffer;
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
...@@ -790,7 +792,6 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, ...@@ -790,7 +792,6 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
memcpy(data + param->offset, bind->buffer, size);
if (param->offset == 0) { if (param->offset == 0) {
if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) { if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) {
tscError("invalid timestamp"); tscError("invalid timestamp");
...@@ -801,6 +802,58 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, ...@@ -801,6 +802,58 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t insertStmtGenLastBlock(STableDataBlocks** lastBlock, STableDataBlocks* pBlock) {
*lastBlock = (STableDataBlocks*)malloc(sizeof(STableDataBlocks));
memcpy(*lastBlock, pBlock, sizeof(STableDataBlocks));
(*lastBlock)->cloned = true;
(*lastBlock)->pData = NULL;
(*lastBlock)->ordered = true;
(*lastBlock)->prevTS = INT64_MIN;
(*lastBlock)->size = sizeof(SSubmitBlk);
(*lastBlock)->tsSource = -1;
return TSDB_CODE_SUCCESS;
}
static int32_t insertStmtGenBlock(STscStmt* pStmt, STableDataBlocks** pBlock, STableMeta* pTableMeta, SName* name) {
int32_t code = 0;
if (pStmt->mtb.lastBlock == NULL) {
tscError("no previous data block");
return TSDB_CODE_TSC_APP_ERROR;
}
int32_t msize = tscGetTableMetaSize(pTableMeta);
int32_t tsize = sizeof(STableDataBlocks) + msize;
void *t = malloc(tsize);
*pBlock = t;
memcpy(*pBlock, pStmt->mtb.lastBlock, sizeof(STableDataBlocks));
t = (char *)t + sizeof(STableDataBlocks);
(*pBlock)->pTableMeta = t;
memcpy((*pBlock)->pTableMeta, pTableMeta, msize);
(*pBlock)->pData = malloc((*pBlock)->nAllocSize);
(*pBlock)->vgId = (*pBlock)->pTableMeta->vgId;
tNameAssign(&(*pBlock)->tableName, name);
SSubmitBlk* blk = (SSubmitBlk*)(*pBlock)->pData;
memset(blk, 0, sizeof(*blk));
code = tsSetBlockInfo(blk, pTableMeta, 0);
if (code != TSDB_CODE_SUCCESS) {
STMT_RET(code);
}
return TSDB_CODE_SUCCESS;
}
static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MULTI_BIND* bind, int32_t rowNum) { static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MULTI_BIND* bind, int32_t rowNum) {
if (bind->buffer_type != param->type || !isValidDataType(param->type)) { if (bind->buffer_type != param->type || !isValidDataType(param->type)) {
...@@ -1227,11 +1280,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { ...@@ -1227,11 +1280,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
pStmt->mtb.tbname = sToken; pStmt->mtb.tbname = sToken;
pStmt->mtb.nameSet = false; pStmt->mtb.nameSet = false;
if (pStmt->mtb.pTableHash == NULL) { if (pStmt->mtb.pTableHash == NULL) {
pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); pStmt->mtb.pTableHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
} }
if (pStmt->mtb.pTableBlockHashList == NULL) { if (pStmt->mtb.pTableBlockHashList == NULL) {
pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); pStmt->mtb.pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
} }
pStmt->mtb.tagSet = true; pStmt->mtb.tagSet = true;
...@@ -1522,6 +1575,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -1522,6 +1575,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags) { int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) { if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED); STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
...@@ -1559,6 +1613,9 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1559,6 +1613,9 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData; SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData;
pCmd->batchSize = pBlk->numOfRows; pCmd->batchSize = pBlk->numOfRows;
if (pBlk->numOfRows == 0) {
(*t1)->prevTS = INT64_MIN;
}
taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES); taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES);
...@@ -1566,6 +1623,51 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1566,6 +1623,51 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STMT_RET(TSDB_CODE_SUCCESS); STMT_RET(TSDB_CODE_SUCCESS);
} }
if (pStmt->mtb.subSet && taosHashGetSize(pStmt->mtb.pTableHash) > 0) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
char sTableName[TSDB_TABLE_FNAME_LEN];
strncpy(sTableName, pTableMeta->sTableName, sizeof(sTableName));
SStrToken tname = {0};
tname.type = TK_STRING;
tname.z = (char *)name;
tname.n = (uint32_t)strlen(name);
SName fullname = {0};
tscSetTableFullName(&fullname, &tname, pSql);
memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname));
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
STMT_RET(code);
}
pTableMeta = pTableMetaInfo->pTableMeta;
if (strcmp(sTableName, pTableMeta->sTableName)) {
tscError("0x%"PRIx64" only tables belongs to one stable is allowed", pSql->self);
STMT_RET(TSDB_CODE_TSC_APP_ERROR);
}
STableDataBlocks* pBlock = NULL;
insertStmtGenBlock(pStmt, &pBlock, pTableMeta, &pTableMetaInfo->name);
pCmd->batchSize = 0;
pStmt->mtb.currentUid = pTableMeta->id.uid;
pStmt->mtb.tbNum++;
taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
STMT_RET(TSDB_CODE_SUCCESS);
}
if (pStmt->mtb.tagSet) { if (pStmt->mtb.tagSet) {
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name); pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
} else { } else {
...@@ -1594,7 +1696,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1594,7 +1696,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
pCmd->insertParam.pTableBlockHashList = hashList; pCmd->insertParam.pTableBlockHashList = hashList;
} }
int32_t code = tsParseSql(pStmt->pSql, true); code = tsParseSql(pStmt->pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
// wait for the callback function to post the semaphore // wait for the callback function to post the semaphore
tsem_wait(&pStmt->pSql->rspSem); tsem_wait(&pStmt->pSql->rspSem);
...@@ -1622,6 +1724,10 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1622,6 +1724,10 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES); taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid)); taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
if (pStmt->mtb.lastBlock == NULL) {
insertStmtGenLastBlock(&pStmt->mtb.lastBlock, pBlock);
}
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid); tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
} }
...@@ -1629,7 +1735,17 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1629,7 +1735,17 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
} }
int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->mtb.subSet = true;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->mtb.subSet = false;
return taos_stmt_set_tbname_tags(stmt, name, NULL); return taos_stmt_set_tbname_tags(stmt, name, NULL);
} }
...@@ -1653,6 +1769,7 @@ int taos_stmt_close(TAOS_STMT* stmt) { ...@@ -1653,6 +1769,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
if (pStmt->pSql && pStmt->pSql->res.code != 0) { if (pStmt->pSql && pStmt->pSql->res.code != 0) {
rmMeta = true; rmMeta = true;
} }
tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta); pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta);
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL; pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
...@@ -1687,6 +1804,8 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) { ...@@ -1687,6 +1804,8 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
pStmt->last = STMT_BIND; pStmt->last = STMT_BIND;
tscDebug("tableId:%" PRIu64 ", try to bind one row", pStmt->mtb.currentUid);
STMT_RET(insertStmtBindParam(pStmt, bind)); STMT_RET(insertStmtBindParam(pStmt, bind));
} else { } else {
STMT_RET(normalStmtBindParam(pStmt, bind)); STMT_RET(normalStmtBindParam(pStmt, bind));
......
...@@ -1517,12 +1517,6 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { ...@@ -1517,12 +1517,6 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
} }
tfree(pDataBlock->pData); tfree(pDataBlock->pData);
tfree(pDataBlock->params);
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta);
}
if (removeMeta) { if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
...@@ -1531,7 +1525,17 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { ...@@ -1531,7 +1525,17 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
if (!pDataBlock->cloned) {
tfree(pDataBlock->params);
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta);
}
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo); tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
tfree(pDataBlock); tfree(pDataBlock);
} }
...@@ -1710,12 +1714,14 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff ...@@ -1710,12 +1714,14 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff
dataBuf->nAllocSize = dataBuf->headerSize * 2; dataBuf->nAllocSize = dataBuf->headerSize * 2;
} }
dataBuf->pData = calloc(1, dataBuf->nAllocSize); //dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->pData = malloc(dataBuf->nAllocSize);
if (dataBuf->pData == NULL) { if (dataBuf->pData == NULL) {
tscError("failed to allocated memory, reason:%s", strerror(errno)); tscError("failed to allocated memory, reason:%s", strerror(errno));
tfree(dataBuf); tfree(dataBuf);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
//Here we keep the tableMeta to avoid it to be remove by other threads. //Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta); dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
...@@ -1956,16 +1962,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { ...@@ -1956,16 +1962,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) { static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList == NULL) { if (pInsertParam->pTableNameList == NULL) {
pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, POINTER_BYTES); pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
} else {
memset(pInsertParam->pTableNameList, 0, pInsertParam->numOfTables * POINTER_BYTES);
} }
STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
int32_t i = 0; int32_t i = 0;
while(p1) { while(p1) {
STableDataBlocks* pBlocks = *p1; STableDataBlocks* pBlocks = *p1;
tfree(pInsertParam->pTableNameList[i]); //tfree(pInsertParam->pTableNameList[i]);
pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName); pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1); p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
...@@ -2009,14 +2013,12 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -2009,14 +2013,12 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) { dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
dataBuf->nAllocSize = (uint32_t)(dataBuf->nAllocSize * 1.5);
}
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize); char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
if (tmp != NULL) { if (tmp != NULL) {
dataBuf->pData = tmp; dataBuf->pData = tmp;
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size); //memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
} else { // failed to allocate memory, free already allocated memory and return error code } else { // failed to allocate memory, free already allocated memory and return error code
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize); tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize);
...@@ -4384,7 +4386,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { ...@@ -4384,7 +4386,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
size_t size = tscGetTableMetaSize(pTableMeta); size_t size = tscGetTableMetaSize(pTableMeta);
STableMeta* p = calloc(1, size); STableMeta* p = malloc(size);
memcpy(p, pTableMeta, size); memcpy(p, pTableMeta, size);
return p; return p;
} }
......
...@@ -306,7 +306,7 @@ bool tIsValidName(const SName* name) { ...@@ -306,7 +306,7 @@ bool tIsValidName(const SName* name) {
SName* tNameDup(const SName* name) { SName* tNameDup(const SName* name) {
assert(name != NULL); assert(name != NULL);
SName* p = calloc(1, sizeof(SName)); SName* p = malloc(sizeof(SName));
memcpy(p, name, sizeof(SName)); memcpy(p, name, sizeof(SName));
return p; return p;
} }
......
...@@ -111,10 +111,12 @@ typedef struct TAOS_MULTI_BIND { ...@@ -111,10 +111,12 @@ typedef struct TAOS_MULTI_BIND {
} TAOS_MULTI_BIND; } TAOS_MULTI_BIND;
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name); DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册