提交 87f12010 编写于 作者: G Ganlin Zhao

fix(query): add stmt retry mechanism in case of schema change

上级 048646ea
......@@ -151,6 +151,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscRestoreTableDataBlocks(SInsertStatementParam *pInsertParam);
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......
......@@ -1162,20 +1162,11 @@ static int insertStmtReset(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
static int insertStmtExecute(STscStmt* stmt) {
static int insertStmtExecuteImpl(STscStmt* stmt, STableMetaInfo* pTableMetaInfo, bool schemaAttached) {
SSqlCmd* pCmd = &stmt->pSql->cmd;
if (pCmd->batchSize == 0) {
tscError("no records bind");
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "no records bind");
}
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) {
return TSDB_CODE_SUCCESS;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
stmt->pSql->cmd.insertParam.schemaAttached = schemaAttached ? 1 : 0;
if (pCmd->insertParam.pTableBlockHashList == NULL) {
pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
......@@ -1192,6 +1183,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pBlk->dataLen = 0;
pBlk->uid = pTableMeta->id.uid;
pBlk->tid = pTableMeta->id.tid;
pBlk->sversion = pTableMeta->sversion;
fillTablesColumnsNull(stmt->pSql);
......@@ -1213,9 +1205,54 @@ static int insertStmtExecute(STscStmt* stmt) {
tscBuildAndSendRequest(pSql, NULL);
return TSDB_CODE_SUCCESS;
}
static int insertStmtExecute(STscStmt* stmt) {
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &stmt->pSql->cmd;
SSqlObj* pSql = stmt->pSql;
if (pCmd->batchSize == 0) {
tscError("no records bind");
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "no records bind");
}
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) {
return code;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
code = insertStmtExecuteImpl(stmt, pTableMetaInfo, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
while (pSql->retry < pSql->maxRetry) {
if (pSql->res.code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->retry += 1;
code = insertStmtExecuteImpl(stmt, pTableMetaInfo, true);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
} else {
break;
}
}
}
stmt->numOfRows += pSql->res.numOfRows;
// data block reset
......@@ -1271,7 +1308,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
return invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "no table name set");
}
pStmt->pSql->retry = pStmt->pSql->maxRetry + 1; //no retry
pStmt->pSql->retry = 0; // enable retry in case of reconfiguring table meta
if (taosHashGetSize(pStmt->pSql->cmd.insertParam.pTableBlockHashList) <= 0) { // merge according to vgId
tscError("0x%"PRIx64" no data block to insert", pStmt->pSql->self);
......
......@@ -3567,6 +3567,13 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
}
pParentObj->res.code = TSDB_CODE_SUCCESS;
if (TSDB_QUERY_HAS_TYPE(pParentObj->cmd.insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
tscRestoreTableDataBlocks(&pParentObj->cmd.insertParam);
tscMergeTableDataBlocks(pParentObj, &pParentObj->cmd.insertParam, false);
tscHandleMultivnodeInsert(pParentObj);
return;
}
tscResetSqlCmd(&pParentObj->cmd, false, pParentObj->self);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
......
......@@ -2211,6 +2211,21 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
int32_t tscRestoreTableDataBlocks(SInsertStatementParam *pInsertParam) {
STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
while (iter) {
STableDataBlocks* pOneTableBlock = *iter;
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows);
iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter);
}
return TSDB_CODE_SUCCESS;
}
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0;
......@@ -2320,8 +2335,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
// the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1;
pBlocks->numOfRows = 0;
} else {
tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册