From 40016c5eb8a682d30b91a4cbe4d74e65a4580fb6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Dec 2020 22:33:11 +0800 Subject: [PATCH] [TD-2361]: optimize the failure retry in insert processing. --- src/client/inc/tscSubquery.h | 2 +- src/client/inc/tsclient.h | 83 +++++++++--------------------- src/client/src/tscAsync.c | 57 +++++---------------- src/client/src/tscLocalMerge.c | 2 +- src/client/src/tscParseInsert.c | 78 ++++++++++++---------------- src/client/src/tscPrepare.c | 4 +- src/client/src/tscServer.c | 15 +++--- src/client/src/tscSubquery.c | 91 ++++++++++++++++++++++++++++----- src/client/src/tscUtil.c | 80 +++++++++++++++++++++-------- 9 files changed, 220 insertions(+), 192 deletions(-) diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index f7832c9818..d3996ccf7f 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -36,7 +36,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); -int32_t tscHandleInsertRetry(SSqlObj* pSql); +int32_t tscHandleInsertRetry(SSqlObj* parent, SSqlObj* child); void tscBuildResFromSubqueries(SSqlObj *pSql); TAOS_ROW doSetResultRowData(SSqlObj *pSql); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4f070dfdc0..97cdb21238 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -37,40 +37,6 @@ extern "C" { #include "qTsbuf.h" #include "tcmdtype.h" -#if 0 -static UNUSED_FUNC void *u_malloc (size_t __size) { - uint32_t v = rand(); - - if (v % 5000 <= 0) { - return NULL; - } else { - return malloc(__size); - } -} - -static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) { - uint32_t v = rand(); - if (v % 5000 <= 0) { - return NULL; - } else { - return calloc(num, __size); - } -} - -static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { - uint32_t v = rand(); - if (v % 5000 <= 0) { - return NULL; - } else { - return realloc(p, __size); - } -} - -#define calloc u_calloc -#define malloc u_malloc -#define realloc u_realloc -#endif - // forward declaration struct SSqlInfo; struct SLocalReducer; @@ -78,7 +44,7 @@ struct SLocalReducer; // data source from sql string or from file enum { DATA_FROM_SQL_STRING = 1, - DATA_FROM_DATA_FILE = 2, + DATA_FROM_DATA_FILE = 2, }; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); @@ -118,10 +84,10 @@ typedef struct STableMetaInfo { * 1. keep the vgroup index during the multi-vnode super table projection query * 2. keep the vgroup index for multi-vnode insertion */ - int32_t vgroupIndex; - char name[TSDB_TABLE_FNAME_LEN]; // (super) table name - char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql - SArray* tagColList; // SArray, involved tag columns + int32_t vgroupIndex; + char name[TSDB_TABLE_FNAME_LEN]; // (super) table name + char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql + SArray *tagColList; // SArray, involved tag columns } STableMetaInfo; /* the structure for sql function in select clause */ @@ -204,22 +170,17 @@ typedef struct SParamInfo { } SParamInfo; typedef struct STableDataBlocks { - char tableId[TSDB_TABLE_FNAME_LEN]; - int8_t tsSource; // where does the UNIX timestamp come from, server or client - bool ordered; // if current rows are ordered or not - int64_t vgId; // virtual group id - int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending - int32_t numOfTables; // number of tables in current submit block - int32_t rowSize; // row size for current table - uint32_t nAllocSize; - uint32_t headerSize; // header for table info (uid, tid, submit metadata) - uint32_t size; - - /* - * the table meta of table, the table meta will be used during submit, keep a ref - * to avoid it to be removed from cache - */ - STableMeta *pTableMeta; + char tableId[TSDB_TABLE_FNAME_LEN]; + int8_t tsSource; // where does the UNIX timestamp come from, server or client + bool ordered; // if current rows are ordered or not + int64_t vgId; // virtual group id + int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending + int32_t numOfTables; // number of tables in current submit block + int32_t rowSize; // row size for current table + uint32_t nAllocSize; + uint32_t headerSize; // header for table info (uid, tid, submit metadata) + uint32_t size; + STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache char *pData; // for parameter ('?') binding @@ -284,10 +245,14 @@ typedef struct { int32_t numOfParams; int8_t dataSourceType; // load data from file or not - int8_t submitSchema; // submit block is built with table schema - STagData *pTagData; // NOTE: pTagData->data is used as a variant length array - SHashObj *pTableList; // referred table involved in sql - SArray *pDataBlocks; // SArray submit data blocks after parsing sql + int8_t submitSchema; // submit block is built with table schema + STagData *pTagData; // NOTE: pTagData->data is used as a variant length array + + STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement. + int32_t numOfTables; + + SHashObj *pTableList; // data block for each table + SArray *pDataBlocks; // SArray. Merged submit block for each vgroup } SSqlCmd; typedef struct SResRec { diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 3ff8a68d8f..42776441c2 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -410,52 +410,26 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code != TSDB_CODE_SUCCESS) { tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code)); goto _error; - } else { - tscDebug("%p get %s successfully", pSql, msg); } + tscDebug("%p get %s successfully", pSql, msg); if (pSql->pStream == NULL) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); // check if it is a sub-query of super table query first, if true, enter another routine - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) { - tscDebug("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql); + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) { + tscDebug("%p update table meta in local cache, continue to process sql and send the corresponding query", pSql); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - return; - } else { - assert(code == TSDB_CODE_SUCCESS); - } - - // param already freed by other routine and pSql in tscCache when ctrl + c - if (atomic_load_ptr(&pSql->param) == NULL) { - return; - } - assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); + assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS); - SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; - SSqlObj * pParObj = trs->pParentSql; - - // NOTE: the vgroupInfo for the queried super table must be existed here. - assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex && - pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL); - - // tscProcessSql can add error into async res - tscProcessSql(pSql); - return; - } else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { - tscDebug("%p update table meta in local cache, continue to process sql and send corresponding tid_tag query", pSql); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return; - } else { - assert(code == TSDB_CODE_SUCCESS); } assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); + // tscProcessSql can add error into async res tscProcessSql(pSql); return; @@ -465,16 +439,18 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); + + assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return; - } else { - assert(code == TSDB_CODE_SUCCESS); } + assert(pCmd->command != TSDB_SQL_INSERT); + // in case of insert, redo parsing the sql string and build new submit data block for two reasons: // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. // 2. vnode may need the schema information along with submit block to update its local table schema. - if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) { + if (pCmd->command == TSDB_SQL_SELECT) { tscDebug("%p redo parse sql string and proceed", pSql); pCmd->parseFinished = false; tscResetSqlCmdObj(pCmd, false); @@ -486,16 +462,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - if (pCmd->command == TSDB_SQL_INSERT) { - /* - * Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks, - * and send the required submit block according to index value in supporter to server. - */ - pSql->fp = pSql->fetchFp; // restore the fp - tscHandleInsertRetry(pSql); - } else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue - tscProcessSql(pSql); - } + tscProcessSql(pSql); + }else { // in all other cases, simple retry tscProcessSql(pSql); } @@ -551,6 +519,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (!pSql->cmd.parseFinished) { tsParseSql(pSql, false); } + (*pSql->fp)(pSql->param, pSql, code); return; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 98bf67b7bb..9d4c7c8377 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -728,7 +728,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SSchema p1 = {0}; if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) { p1 = tGetTableNameColumnSchema(); - } else if (pExpr->colInfo.colIndex == TSDB_UD_COLUMN_INDEX) { + } else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { p1.bytes = pExpr->resBytes; p1.type = pExpr->resType; tstrncpy(p1.name, pExpr->aliasName, tListLen(p1.name)); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 18e5b6f074..4cb7cda27e 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -686,17 +686,14 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) { } } -static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd, - int32_t *totalNum) { - SSqlCmd * pCmd = &pSql->cmd; +static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColInfo *spd, int32_t *totalNum) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; + STableMeta *pTableMeta = pTableMetaInfo->pTableMeta; STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableDataBlocks *dataBuf = NULL; - int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, - pTableMeta, &dataBuf); + int32_t ret = tscGetDataBlockFromList(pCmd->pTableList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, + sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf, NULL); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -1060,16 +1057,15 @@ int tsParseInsertSql(SSqlObj *pSql) { if (NULL == pCmd->pTableList) { pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES); - if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) { + if (NULL == pCmd->pTableList) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; + goto _clean; } } else { str = pCmd->curSql; } - tscDebug("%p create data block list for submit data:%p, pTableList:%p", pSql, pCmd->pDataBlocks, pCmd->pTableList); + tscDebug("%p create data block list hashList:%p", pSql, pCmd->pTableList); while (1) { int32_t index = 0; @@ -1091,7 +1087,7 @@ int tsParseInsertSql(SSqlObj *pSql) { */ if (totalNum == 0) { code = TSDB_CODE_TSC_INVALID_SQL; - goto _error; + goto _clean; } else { break; } @@ -1104,11 +1100,11 @@ int tsParseInsertSql(SSqlObj *pSql) { // Check if the table name available or not if (validateTableName(sToken.z, sToken.n, &sTblToken) != TSDB_CODE_SUCCESS) { code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z); - goto _error; + goto _clean; } if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { @@ -1122,12 +1118,12 @@ int tsParseInsertSql(SSqlObj *pSql) { tscError("%p async insert parse error, code:%s", pSql, tstrerror(code)); pCmd->curSql = NULL; - goto _error; + goto _clean; } if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); - goto _error; + goto _clean; } index = 0; @@ -1136,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) { if (sToken.n == 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z); - goto _error; + goto _clean; } STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -1148,32 +1144,32 @@ int tsParseInsertSql(SSqlObj *pSql) { tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } /* * app here insert data in different vnodes, so we need to set the following * data in another submit procedure using async insert routines */ - code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); + code = doParseInsertStatement(pCmd, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } } else if (sToken.type == TK_FILE) { if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } index = 0; sToken = tStrGetToken(str, &index, false, 0, NULL); if (sToken.type != TK_STRING && sToken.type != TK_ID) { code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); - goto _error; + goto _clean; } str += index; if (sToken.n == 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); - goto _error; + goto _clean; } strncpy(pCmd->payload, sToken.z, sToken.n); @@ -1183,7 +1179,7 @@ int tsParseInsertSql(SSqlObj *pSql) { wordexp_t full_path; if (wordexp(pCmd->payload, &full_path, 0) != 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z); - goto _error; + goto _clean; } tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize); @@ -1195,7 +1191,7 @@ int tsParseInsertSql(SSqlObj *pSql) { SSchema * pSchema = tscGetTableSchema(pTableMeta); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } SParsedDataColInfo spd = {0}; @@ -1230,7 +1226,7 @@ int tsParseInsertSql(SSqlObj *pSql) { if (spd.hasVal[t] == true) { code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z); - goto _error; + goto _clean; } spd.hasVal[t] = true; @@ -1241,13 +1237,13 @@ int tsParseInsertSql(SSqlObj *pSql) { if (!findColumnIndex) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z); - goto _error; + goto _clean; } } if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) { code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z); - goto _error; + goto _clean; } index = 0; @@ -1256,16 +1252,16 @@ int tsParseInsertSql(SSqlObj *pSql) { if (sToken.type != TK_VALUES) { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z); - goto _error; + goto _clean; } - code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); + code = doParseInsertStatement(pCmd, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { - goto _error; + goto _clean; } } else { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z); - goto _error; + goto _clean; } } @@ -1274,25 +1270,18 @@ int tsParseInsertSql(SSqlObj *pSql) { goto _clean; } - if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId - if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { - goto _error; + if (taosHashGetSize(pCmd->pTableList) > 0) { // merge according to vgId + if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) { + goto _clean; } } code = TSDB_CODE_SUCCESS; goto _clean; -_error: - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - _clean: - taosHashCleanup(pCmd->pTableList); - pCmd->pTableList = NULL; - - pCmd->curSql = NULL; + pCmd->curSql = NULL; pCmd->parseFinished = 1; - return code; } @@ -1373,6 +1362,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { pSql->parseRetry++; ret = tscToSQLCmd(pSql, &SQLInfo); } + SQLInfoDestroy(&SQLInfo); } @@ -1399,7 +1389,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL); } - if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { + if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 30e0729427..fb5aed48bd 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -800,9 +800,9 @@ static int insertStmtExecute(STscStmt* stmt) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); assert(pCmd->numOfClause == 1); - if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { + if (taosHashGetSize(pCmd->pTableList) > 0) { // merge according to vgid - int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks); + int code = tscMergeTableDataBlocks(stmt->pSql); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8bc65f0c65..ed761a92f1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -280,19 +280,18 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } int32_t cmd = pCmd->command; - if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && + // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema + if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + pSql->cmd.submitSchema = 1; + } + + if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || - rpcMsg->code == TSDB_CODE_APP_NOT_READY || - rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) { + rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); - // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema - if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - pSql->cmd.submitSchema = 1; - } - pSql->res.code = rpcMsg->code; // keep the previous error code if (pSql->retry > pSql->maxRetry) { tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 819a323db5..59879d86d1 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2163,23 +2163,76 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) assert(pSql != NULL && pSql->res.code == numOfRows); pParentObj->res.code = pSql->res.code; - } - tfree(pSupporter); + // set the flag in the parent sqlObj + if (pSql->cmd.submitSchema) { + pParentObj->cmd.submitSchema = 1; + } + } if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { return; } - - tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); // restore user defined fp pParentObj->fp = pParentObj->fetchFp; + int32_t numOfSub = pParentObj->subState.numOfSub; + + if (pParentObj->res.code == TSDB_CODE_SUCCESS) { + tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); + for(int32_t i = 0; i < numOfSub; ++i) { + SSqlObj* pSql = pParentObj->pSubs[i]; + tfree(pSql->param); + } + + // todo remove this parameter in async callback function definition. + // all data has been sent to vnode, call user function + int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; + (*pParentObj->fp)(pParentObj->param, pParentObj, v); + } else { + int32_t numOfFailed = 0; + + for(int32_t i = 0; i < numOfSub; ++i) { + SSqlObj* pSql = pParentObj->pSubs[i]; + if (pSql->res.code != TSDB_CODE_SUCCESS) { + numOfFailed += 1; + + // clean up tableMeta in cache + tscFreeQueryInfo(&pSql->cmd, true); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0); + STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0); + tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); + + tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql); + } + } + + tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj, + pParentObj->res.numOfRows, numOfFailed, numOfSub); + + tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables); + for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) { + taosCacheRelease(tscMetaCache, (void**)&(pParentObj->cmd.pTableMetaList[i]), true); + } + + pParentObj->cmd.parseFinished = false; + pParentObj->subState.numOfRemain = numOfFailed; + pParentObj->subState.numOfSub = numOfFailed; - // todo remove this parameter in async callback function definition. - // all data has been sent to vnode, call user function - int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; - (*pParentObj->fp)(pParentObj->param, pParentObj, v); + tscResetSqlCmdObj(&pParentObj->cmd, false); + + tscDebug("%p re-parse sql to generate data", pParentObj); + int32_t code = tsParseSql(pParentObj, true); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; + + if (code != TSDB_CODE_SUCCESS) { + pParentObj->res.code = code; + tscQueueAsyncRes(pParentObj); + return; + } + + tscDoQuery(pParentObj); + } } /** @@ -2187,19 +2240,19 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) * @param pSql * @return */ -int32_t tscHandleInsertRetry(SSqlObj* pSql) { +int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { assert(pSql != NULL && pSql->param != NULL); - SSqlCmd* pCmd = &pSql->cmd; +// SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; assert(pSupporter->index < pSupporter->pSql->subState.numOfSub); - STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); + STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); // free the data block created from insert sql string - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); +// pCmd->pDataBlocks = tscDestroyBlockArrayList(pParent->cmd.pDataBlocks); if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); @@ -2213,6 +2266,20 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; + // it is the failure retry insert + if (pSql->pSubs != NULL) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + + tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, i); + if (pSub->res.code != TSDB_CODE_SUCCESS) { + tscHandleInsertRetry(pSql, pSub); + } + } + + return TSDB_CODE_SUCCESS; + } + pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); assert(pSql->subState.numOfSub > 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index fd03aa5099..dd20c1f84e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -333,13 +333,15 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (isNull(p, TSDB_DATA_TYPE_NCHAR)) { memcpy(dst, p, varDataTLen(p)); - } else { + } else if (varDataLen(p) > 0) { int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst)); varDataSetLen(dst, length); if (length == 0) { tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p); } + } else { + varDataSetLen(dst, 0); } p += pInfo->field.bytes; @@ -377,7 +379,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free } -static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) { +void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) { if (pCmd == NULL || pCmd->numOfClause == 0) { return; } @@ -403,12 +405,12 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) { pCmd->msgType = 0; pCmd->parseFinished = 0; pCmd->autoCreated = 0; - - taosHashCleanup(pCmd->pTableList); - pCmd->pTableList = NULL; - - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + pCmd->numOfTables = 0; + + tfree(pCmd->pTableMetaList); + pCmd->pTableList = tscDestroyBlockHashTable(pCmd->pTableList); + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); tscFreeQueryInfo(pCmd, removeFromCache); } @@ -575,6 +577,21 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { return NULL; } +void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable) { + if (pBlockHashTable == NULL) { + return NULL; + } + + STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL); + while(p) { + tscDestroyDataBlock(*p); + p = taosHashIterate(pBlockHashTable, p); + } + + taosHashCleanup(pBlockHashTable); + return NULL; +} + int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { SSqlCmd* pCmd = &pSql->cmd; assert(pDataBlock->pTableMeta != NULL); @@ -671,9 +688,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff return TSDB_CODE_SUCCESS; } -int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, - STableDataBlocks** dataBlocks) { +int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, + STableDataBlocks** dataBlocks, SArray* pBlockList) { *dataBlocks = NULL; STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); @@ -688,7 +704,9 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t } taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); - taosArrayPush(pDataBlockList, dataBlocks); + if (pBlockList) { + taosArrayPush(pBlockList, dataBlocks); + } } return TSDB_CODE_SUCCESS; @@ -769,22 +787,37 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { +static void extractTableMeta(SSqlCmd* pCmd) { + pCmd->numOfTables = taosHashGetSize(pCmd->pTableList); + pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES); + + STableDataBlocks **p1 = taosHashIterate(pCmd->pTableList, NULL); + int32_t i = 0; + while(p1) { + STableDataBlocks* pBlocks = *p1; + pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta); + p1 = taosHashIterate(pCmd->pTableList, p1); + } + + pCmd->pTableList = tscDestroyBlockHashTable(pCmd->pTableList); +} + +int32_t tscMergeTableDataBlocks(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); - size_t total = taosArrayGetSize(pTableDataBlockList); - for (int32_t i = 0; i < total; ++i) { + STableDataBlocks** p = taosHashIterate(pCmd->pTableList, NULL); + + STableDataBlocks* pOneTableBlock = *p; + while(pOneTableBlock) { // the maximum expanded size in byte when a row-wise data is converted to SDataRow format - STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta); STableDataBlocks* dataBuf = NULL; - int32_t ret = - tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, - tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf); + int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, + tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList); if (ret != TSDB_CODE_SUCCESS) { tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); taosHashCleanup(pVnodeDataBlockHashList); @@ -839,14 +872,19 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); dataBuf->numOfTables += 1; + + p = taosHashIterate(pCmd->pTableList, p); + if (p == NULL) { + break; + } + + pOneTableBlock = *p; } - tscDestroyBlockArrayList(pTableDataBlockList); + extractTableMeta(pCmd); // free the table data blocks; pCmd->pDataBlocks = pVnodeDataBlockList; - -// tscFreeUnusedDataBlocks(pCmd->pDataBlocks); taosHashCleanup(pVnodeDataBlockHashList); return TSDB_CODE_SUCCESS; -- GitLab