diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index dbb31f4634732fc52bc77c634697fd2f2d177625..1fb00bc0d78d25e20ab5b7900e47f779b26a31a0 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -99,6 +99,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); +int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo); void doRetrieveSubqueryData(SSchedMsg *pMsg); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index c0a1afda77fbab6dba58fe2583294e79ea081acb..5f9da6e3bab351c22c7d2f8a1bf8cc8220b3c1c2 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -108,7 +108,8 @@ typedef struct STableDataBlocks { uint32_t size; STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache char *pData; - + bool cloned; + SParsedDataColInfo boundColumnInfo; // for parameter ('?') binding diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 26d9cf0e49caa0dfaa50fa7ff29b74f0793e73a1..8d064ed5834dbaf7338a4657b4d87041056f8f74 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -643,7 +643,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 return TSDB_CODE_SUCCESS; } -static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { +int32_t FORCE_INLINE tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { pBlocks->tid = pTableMeta->id.tid; pBlocks->uid = pTableMeta->id.uid; pBlocks->sversion = pTableMeta->sversion; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index a695b493048849d76fb6df5a0d53ac525f54d94a..830560438244e604cf4aa2a6fe87d79a8f2da83e 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -55,6 +55,7 @@ typedef struct SMultiTbStmt { SStrToken stbname; SStrToken values; SArray *tags; + STableDataBlocks *lastBlock; SHashObj *pTableHash; SHashObj *pTableBlockHashList; // data block for each table } SMultiTbStmt; @@ -348,7 +349,7 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) { //////////////////////////////////////////////////////////////////////////////// // functions for insertion statement preparation -static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { +static FORCE_INLINE int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { if (bind->is_null != NULL && *(bind->is_null)) { setNull(data + param->offset, param->type, param->bytes); return TSDB_CODE_SUCCESS; @@ -747,25 +748,25 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_UTINYINT: - size = 1; + *(uint8_t *)(data + param->offset) = *(uint8_t *)bind->buffer; break; case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_USMALLINT: - size = 2; + *(uint16_t *)(data + param->offset) = *(uint16_t *)bind->buffer; break; case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_FLOAT: - size = 4; + *(uint32_t *)(data + param->offset) = *(uint32_t *)bind->buffer; break; case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_TIMESTAMP: - size = 8; + *(uint64_t *)(data + param->offset) = *(uint64_t *)bind->buffer; break; case TSDB_DATA_TYPE_BINARY: @@ -791,7 +792,6 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, return TSDB_CODE_TSC_INVALID_VALUE; } - memcpy(data + param->offset, bind->buffer, size); if (param->offset == 0) { if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) { tscError("invalid timestamp"); @@ -802,6 +802,101 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, return TSDB_CODE_SUCCESS; } +static int32_t insertStmtGenLastBlock(STableDataBlocks** lastBlock, STableDataBlocks* pBlock) { + *lastBlock = (STableDataBlocks*)malloc(sizeof(STableDataBlocks)); + memcpy(*lastBlock, pBlock, sizeof(STableDataBlocks)); + (*lastBlock)->cloned = true; + +#if 0 + void* tmp = malloc((*pBlock)->numOfAllocedParams * sizeof(SParamInfo)); + memcpy(tmp, (*pBlock)->params, (*pBlock)->numOfAllocedParams * sizeof(SParamInfo)); + (*pBlock)->params = tmp; +#endif + + (*lastBlock)->pData = NULL; + (*lastBlock)->ordered = true; + (*lastBlock)->prevTS = INT64_MIN; + (*lastBlock)->size = sizeof(SSubmitBlk); + (*lastBlock)->tsSource = -1; + +#if 0 + if ((*pBlock)->boundColumnInfo.boundedColumns) { + tmp = malloc((*pBlock)->boundColumnInfo.numOfCols * sizeof(int32_t)); + memcpy(tmp, (*pBlock)->boundColumnInfo.boundedColumns, (*pBlock)->boundColumnInfo.numOfCols * sizeof(int32_t)); + (*pBlock)->boundColumnInfo.boundedColumns = tmp; + } + + if ((*pBlock)->boundColumnInfo.cols) { + tmp = malloc((*pBlock)->boundColumnInfo.numOfCols * sizeof(SBoundColumn)); + memcpy(tmp, (*pBlock)->boundColumnInfo.cols, (*pBlock)->boundColumnInfo.numOfCols * sizeof(SBoundColumn)); + (*pBlock)->boundColumnInfo.cols = tmp; + } +#endif + + return TSDB_CODE_SUCCESS; +} + + +static int32_t insertStmtGenBlock(STscStmt* pStmt, STableDataBlocks** pBlock, STableMeta* pTableMeta, SName* name) { + int32_t code = 0; + + if (pStmt->mtb.lastBlock == NULL) { + tscError("no previous data block"); + return TSDB_CODE_TSC_APP_ERROR; + } + + int32_t msize = tscGetTableMetaSize(pTableMeta); + int32_t tsize = sizeof(STableDataBlocks) + msize; + + void *t = malloc(tsize); + *pBlock = t; + + //*pBlock = (STableDataBlocks*)malloc(sizeof(STableDataBlocks)); + memcpy(*pBlock, pStmt->mtb.lastBlock, sizeof(STableDataBlocks)); + +#if 0 + void* tmp = malloc((*pBlock)->numOfAllocedParams * sizeof(SParamInfo)); + memcpy(tmp, (*pBlock)->params, (*pBlock)->numOfAllocedParams * sizeof(SParamInfo)); + (*pBlock)->params = tmp; +#endif + + t = (char *)t + sizeof(STableDataBlocks); + (*pBlock)->pTableMeta = t; + memcpy((*pBlock)->pTableMeta, pTableMeta, msize); + + (*pBlock)->pData = malloc((*pBlock)->nAllocSize); + //(*pBlock)->pTableMeta = tscTableMetaDup(pTableMeta); + //(*pBlock)->pTableMeta = pTableMeta; + + (*pBlock)->vgId = (*pBlock)->pTableMeta->vgId; + +#if 0 + if ((*pBlock)->boundColumnInfo.boundedColumns) { + tmp = malloc((*pBlock)->boundColumnInfo.numOfCols * sizeof(int32_t)); + memcpy(tmp, (*pBlock)->boundColumnInfo.boundedColumns, (*pBlock)->boundColumnInfo.numOfCols * sizeof(int32_t)); + (*pBlock)->boundColumnInfo.boundedColumns = tmp; + } + + if ((*pBlock)->boundColumnInfo.cols) { + tmp = malloc((*pBlock)->boundColumnInfo.numOfCols * sizeof(SBoundColumn)); + memcpy(tmp, (*pBlock)->boundColumnInfo.cols, (*pBlock)->boundColumnInfo.numOfCols * sizeof(SBoundColumn)); + (*pBlock)->boundColumnInfo.cols = tmp; + } +#endif + + tNameAssign(&(*pBlock)->tableName, name); + + SSubmitBlk* blk = (SSubmitBlk*)(*pBlock)->pData; + memset(blk, 0, sizeof(*blk)); + + code = tsSetBlockInfo(blk, pTableMeta, 0); + if (code != TSDB_CODE_SUCCESS) { + STMT_RET(code); + } + + return TSDB_CODE_SUCCESS; +} + static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MULTI_BIND* bind, int32_t rowNum) { if (bind->buffer_type != param->type || !isValidDataType(param->type)) { @@ -1173,6 +1268,8 @@ static void insertBatchClean(STscStmt* pStmt) { static int insertBatchStmtExecute(STscStmt* pStmt) { int32_t code = 0; + + int64_t st1 = taosGetTimestampUs(); if(pStmt->mtb.nameSet == false) { tscError("0x%"PRIx64" no table name set", pStmt->pSql->self); @@ -1188,23 +1285,35 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { fillTablesColumnsNull(pStmt->pSql); + int64_t st2 = taosGetTimestampUs(); + if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { return code; } + int64_t st3 = taosGetTimestampUs(); + code = tscHandleMultivnodeInsert(pStmt->pSql); if (code != TSDB_CODE_SUCCESS) { return code; } + int64_t st4 = taosGetTimestampUs(); + // wait for the callback function to post the semaphore tsem_wait(&pStmt->pSql->rspSem); + int64_t st5 = taosGetTimestampUs(); + code = pStmt->pSql->res.code; insertBatchClean(pStmt); + int64_t st6 = taosGetTimestampUs(); + + tscDebug("use time:%"PRId64 ",%"PRId64 ",%"PRId64 ",%"PRId64 ",%"PRId64, st2-st1, st3-st2, st4-st3, st5-st4, st6-st5); + return code; } @@ -1228,11 +1337,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { pStmt->mtb.tbname = sToken; pStmt->mtb.nameSet = false; if (pStmt->mtb.pTableHash == NULL) { - pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + pStmt->mtb.pTableHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); } if (pStmt->mtb.pTableBlockHashList == NULL) { - pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + pStmt->mtb.pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); } pStmt->mtb.tagSet = true; @@ -1561,6 +1670,9 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData; pCmd->batchSize = pBlk->numOfRows; + if (pBlk->numOfRows == 0) { + (*t1)->prevTS = INT64_MIN; + } taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES); @@ -1580,6 +1692,9 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags tname.n = strlen(name); SName fullname = {0}; tscSetTableFullName(&fullname, &tname, pSql); + + memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname)); + code = tscGetTableMeta(pSql, pTableMetaInfo); if (code != TSDB_CODE_SUCCESS) { STMT_RET(code); @@ -1592,25 +1707,21 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags STMT_RET(TSDB_CODE_TSC_APP_ERROR); } - memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname)); - STableDataBlocks* pBlock = NULL; - code = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), - pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL); - if (code != TSDB_CODE_SUCCESS) { - STMT_RET(code); - } - SSubmitBlk* blk = (SSubmitBlk*)pBlock->pData; - blk->numOfRows = 0; + insertStmtGenBlock(pStmt, &pBlock, pTableMeta, &pTableMetaInfo->name); + pCmd->batchSize = 0; + pStmt->mtb.currentUid = pTableMeta->id.uid; pStmt->mtb.tbNum++; - + + taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES); taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES); taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid)); tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid); + STMT_RET(TSDB_CODE_SUCCESS); } @@ -1670,6 +1781,10 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES); taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid)); + if (pStmt->mtb.lastBlock == NULL) { + insertStmtGenLastBlock(&pStmt->mtb.lastBlock, pBlock); + } + tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid); } @@ -1711,6 +1826,7 @@ int taos_stmt_close(TAOS_STMT* stmt) { if (pStmt->pSql && pStmt->pSql->res.code != 0) { rmMeta = true; } + tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta); pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta); taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL; @@ -1745,6 +1861,8 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) { pStmt->last = STMT_BIND; + tscDebug("tableId:%" PRIu64 ", try to bind one row", pStmt->mtb.currentUid); + STMT_RET(insertStmtBindParam(pStmt, bind)); } else { STMT_RET(normalStmtBindParam(pStmt, bind)); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9b05922f49e66341c9591314c9c780b5a61086fd..020b71d9b8a0e306fe08cbc87707ad50e82306c2 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1478,12 +1478,6 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { } tfree(pDataBlock->pData); - tfree(pDataBlock->params); - - // free the refcount for metermeta - if (pDataBlock->pTableMeta != NULL) { - tfree(pDataBlock->pTableMeta); - } if (removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; @@ -1492,7 +1486,17 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } - tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo); + if (!pDataBlock->cloned) { + tfree(pDataBlock->params); + + // free the refcount for metermeta + if (pDataBlock->pTableMeta != NULL) { + tfree(pDataBlock->pTableMeta); + } + + tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo); + } + tfree(pDataBlock); } @@ -1630,12 +1634,14 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff dataBuf->nAllocSize = dataBuf->headerSize * 2; } - dataBuf->pData = calloc(1, dataBuf->nAllocSize); + //dataBuf->pData = calloc(1, dataBuf->nAllocSize); + dataBuf->pData = malloc(dataBuf->nAllocSize); if (dataBuf->pData == NULL) { tscError("failed to allocated memory, reason:%s", strerror(errno)); tfree(dataBuf); return TSDB_CODE_TSC_OUT_OF_MEMORY; } + memset(dataBuf->pData, 0, sizeof(SSubmitBlk)); //Here we keep the tableMeta to avoid it to be remove by other threads. dataBuf->pTableMeta = tscTableMetaDup(pTableMeta); @@ -1761,16 +1767,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) { pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); if (pInsertParam->pTableNameList == NULL) { - pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, POINTER_BYTES); - } else { - memset(pInsertParam->pTableNameList, 0, pInsertParam->numOfTables * POINTER_BYTES); + pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); } STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); int32_t i = 0; while(p1) { STableDataBlocks* pBlocks = *p1; - tfree(pInsertParam->pTableNameList[i]); + //tfree(pInsertParam->pTableNameList[i]); pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName); p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1); @@ -1809,14 +1813,12 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); if (dataBuf->nAllocSize < destSize) { - while (dataBuf->nAllocSize < destSize) { - dataBuf->nAllocSize = (uint32_t)(dataBuf->nAllocSize * 1.5); - } + dataBuf->nAllocSize = (uint32_t)(destSize * 1.5); char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize); if (tmp != NULL) { dataBuf->pData = tmp; - memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size); + //memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size); } else { // failed to allocate memory, free already allocated memory and return error code tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize); @@ -4144,7 +4146,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { assert(pTableMeta != NULL); size_t size = tscGetTableMetaSize(pTableMeta); - STableMeta* p = calloc(1, size); + STableMeta* p = malloc(size); memcpy(p, pTableMeta, size); return p; } diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 72e2d42ff9bb8141d6bfc11dcc13ec470f9b09e1..26502c5d9cd032afd20d89ba8ea2da72b82a62c1 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -306,7 +306,7 @@ bool tIsValidName(const SName* name) { SName* tNameDup(const SName* name) { assert(name != NULL); - SName* p = calloc(1, sizeof(SName)); + SName* p = malloc(sizeof(SName)); memcpy(p, name, sizeof(SName)); return p; }