diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index c96cc8c1710cb33b298d272cde56ed65ee7ff072..3b8f78fc7c6538a68bbf2c6e24a9dd8cbb81877d 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -151,6 +151,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList); void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); +int32_t tscRestoreTableDataBlocks(SInsertStatementParam *pInsertParam); int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap); int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 1ae546d1884ccfe1da8d45c66e6df7c95ee06108..494f82dd9ec00922de31d25f7458491caba57fe3 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1168,20 +1168,11 @@ static int insertStmtReset(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -static int insertStmtExecute(STscStmt* stmt) { +static int insertStmtExecuteImpl(STscStmt* stmt, STableMetaInfo* pTableMetaInfo, bool schemaAttached) { SSqlCmd* pCmd = &stmt->pSql->cmd; - if (pCmd->batchSize == 0) { - tscError("no records bind"); - return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "no records bind"); - } - - if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) { - return TSDB_CODE_SUCCESS; - } - - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); - STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + stmt->pSql->cmd.insertParam.schemaAttached = schemaAttached ? 1 : 0; + if (pCmd->insertParam.pTableBlockHashList == NULL) { pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); } @@ -1198,6 +1189,7 @@ static int insertStmtExecute(STscStmt* stmt) { pBlk->dataLen = 0; pBlk->uid = pTableMeta->id.uid; pBlk->tid = pTableMeta->id.tid; + pBlk->sversion = pTableMeta->sversion; fillTablesColumnsNull(stmt->pSql); @@ -1219,9 +1211,54 @@ static int insertStmtExecute(STscStmt* stmt) { tscBuildAndSendRequest(pSql, NULL); + return TSDB_CODE_SUCCESS; + +} + +static int insertStmtExecute(STscStmt* stmt) { + int32_t code = TSDB_CODE_SUCCESS; + + SSqlCmd* pCmd = &stmt->pSql->cmd; + SSqlObj* pSql = stmt->pSql; + + if (pCmd->batchSize == 0) { + tscError("no records bind"); + return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "no records bind"); + } + + if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) { + return code; + } + + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); + + code = insertStmtExecuteImpl(stmt, pTableMetaInfo, false); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + // wait for the callback function to post the semaphore tsem_wait(&pSql->rspSem); + if (pSql->res.code != TSDB_CODE_SUCCESS) { + while (pSql->retry < pSql->maxRetry) { + if (pSql->res.code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + pSql->retry += 1; + + code = insertStmtExecuteImpl(stmt, pTableMetaInfo, true); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // wait for the callback function to post the semaphore + tsem_wait(&pSql->rspSem); + } else { + break; + } + } + + } + stmt->numOfRows += pSql->res.numOfRows; // data block reset @@ -1271,13 +1308,13 @@ static void insertBatchClean(STscStmt* pStmt) { static int insertBatchStmtExecute(STscStmt* pStmt) { int32_t code = 0; - + if(pStmt->mtb.nameSet == false) { tscError("0x%"PRIx64" no table name set", pStmt->pSql->self); return invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "no table name set"); } - pStmt->pSql->retry = pStmt->pSql->maxRetry + 1; //no retry + pStmt->pSql->retry = 0; // enable retry in case of reconfiguring table meta if (taosHashGetSize(pStmt->pSql->cmd.insertParam.pTableBlockHashList) <= 0) { // merge according to vgId tscError("0x%"PRIx64" no data block to insert", pStmt->pSql->self); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 4b46cad4cda5d44ce248428e4fd630f162077720..e9a356ec4f1f2ea08bcd13d20d5ecc66d022031c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3510,7 +3510,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) if (taos_errno(tres) != TSDB_CODE_SUCCESS) { SSqlObj* pSql = (SSqlObj*) tres; assert(pSql != NULL && pSql->res.code == numOfRows); - + pParentObj->res.code = pSql->res.code; // set the flag in the parent sqlObj @@ -3518,9 +3518,9 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) pParentObj->cmd.insertParam.schemaAttached = 1; } } - + if (!subAndCheckDone(tres, pParentObj, pSupporter->idx)) { - // concurrency problem, other thread already release pParentObj + // concurrency problem, other thread already release pParentObj //tscDebug("0x%"PRIx64" insert:%p,%d completed, total:%d", pParentObj->self, tres, suppIdx, pParentObj->subState.numOfSub); return; } @@ -3577,6 +3577,13 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } pParentObj->res.code = TSDB_CODE_SUCCESS; + if (TSDB_QUERY_HAS_TYPE(pParentObj->cmd.insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { + tscRestoreTableDataBlocks(&pParentObj->cmd.insertParam); + tscMergeTableDataBlocks(pParentObj, &pParentObj->cmd.insertParam, false); + tscHandleMultivnodeInsert(pParentObj); + return; + } + tscResetSqlCmd(&pParentObj->cmd, false, pParentObj->self); // in case of insert, redo parsing the sql string and build new submit data block for two reasons: diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7b890e15be0a8cee3869ed9f770f3df2c3429838..4fea4f965225466f30cb6f34b9afdf81f81b716f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2211,16 +2211,31 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } +int32_t tscRestoreTableDataBlocks(SInsertStatementParam *pInsertParam) { + STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); + while (iter) { + STableDataBlocks* pOneTableBlock = *iter; + SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; + pBlocks->tid = htonl(pBlocks->tid); + pBlocks->uid = htobe64(pBlocks->uid); + pBlocks->sversion = htonl(pBlocks->sversion); + pBlocks->numOfRows = htons(pBlocks->numOfRows); + iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter); + } + + return TSDB_CODE_SUCCESS; +} + int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType); size_t initialSize = taosHashGetSize(pInsertParam->pTableBlockHashList); initialSize = initialSize > 128 ? 128 : initialSize; - + void* pVnodeDataBlockHashList = taosHashInit(initialSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); - + // alloc table name list. size_t numOfTables = taosHashGetSize(pInsertParam->pTableBlockHashList); if (pInsertParam->pTableNameList) { @@ -2228,7 +2243,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar } pInsertParam->pTableNameList = calloc(numOfTables, sizeof(SName*)); pInsertParam->numOfTables = (int32_t) numOfTables; - + size_t tail = 0; SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); @@ -2236,10 +2251,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar STableDataBlocks* pOneTableBlock = *iter; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter); - + // extract table name list. pInsertParam->pTableNameList[tail++] = tNameDup(&pOneTableBlock->tableName); - + if (pBlocks->numOfRows > 0) { // the maximum expanded size in byte when a row-wise data is converted to SDataRow format int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; @@ -2320,17 +2335,15 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); dataBuf->numOfTables += 1; - - pBlocks->numOfRows = 0; } else { tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname); } - + if (freeBlockMap) { tscDestroyDataBlock(pSql, pOneTableBlock, false); } } - + if (freeBlockMap) { taosHashCleanup(pInsertParam->pTableBlockHashList); pInsertParam->pTableBlockHashList = NULL; @@ -2355,7 +2368,7 @@ void tscCloseTscObj(void *param) { tscReleaseRpc(pObj->pRpcObj); pthread_mutex_destroy(&pObj->mutex); tscReleaseClusterInfo(pObj->clusterId); - + destroyDispatcherManager(pObj->dispatcherManager); pObj->dispatcherManager = NULL;