diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index a4df124fa2463150bac4695cb71a7f1276d49f8f..d0aecfe1a2d6c3a817f5d838b0a5a932a267fda9 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -354,6 +354,8 @@ char* strdup_throw(const char* str); bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src); SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg); +void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id); + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index d857d00e159d06df266187693913a6b6e404a2b3..910a80d6af8488c4eb72d7f9f5ceea8a1d429c7e 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -325,61 +325,6 @@ void tscAsyncResultOnError(SSqlObj* pSql) { int tscSendMsgToServer(SSqlObj *pSql); -static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SQueryInfo* pQueryInfo) { - // handle the invalid table error code for super table. - // update the pExpr info, colList info, number of table columns - // TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case. - if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) { - int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo); - int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta); - - SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); - SSchema *pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); - - for (int32_t i = 0; i < numOfExprs; ++i) { - SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base); - - // update the table uid - pExpr->uid = pTableMetaInfo->pTableMeta->id.uid; - - if (pExpr->colInfo.colIndex >= 0) { - int32_t index = pExpr->colInfo.colIndex; - - if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) || - (TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < 0 || index >= numOfTags))) { - return pSql->retryReason; - } - - if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { - if ((pTagSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) && - strcasecmp(pExpr->colInfo.name, pTagSchema[pExpr->colInfo.colIndex].name) != 0) { - return pSql->retryReason; - } - } else if (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag)) { - if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) && - strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) { - return pSql->retryReason; - } - } else { // do nothing for udc - } - } - } - - // validate the table columns information - for (int32_t i = 0; i < taosArrayGetSize(pQueryInfo->colList); ++i) { - SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i); - if (pCol->columnIndex >= numOfCols) { - return pSql->retryReason; - } - } - } else { - // do nothing - } - - return TSDB_CODE_SUCCESS; -} - void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param); if (pSql == NULL) return; @@ -391,7 +336,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { pRes->code = code; SSqlObj *sub = (SSqlObj*) res; - const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta"; + const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"[multi-]tableMeta"; if (code != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code)); goto _error; @@ -401,31 +346,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (pSql->pStream == NULL) { SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - // check if it is a sub-query of super table query first, if true, enter another routine - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | - TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) { - tscDebug("0x%" PRIx64 " update cached table-meta, continue to process sql and send the corresponding query", pSql->self); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - code = tscGetTableMeta(pSql, pTableMetaInfo); - assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS); - - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, pSql->self); - return; - } - - assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); - code = updateMetaBeforeRetryQuery(pSql, pTableMetaInfo, pQueryInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } +// assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) == 0); - // tscBuildAndSendRequest can add error into async res - tscBuildAndSendRequest(pSql, NULL); - taosReleaseRef(tscObjRef, pSql->self); - return; - } else { // continue to process normal async query + // super table subquery failure will be ignored +// if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | +// TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self); @@ -437,7 +362,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { + if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { // stmt insert STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { @@ -448,17 +373,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } (*pSql->fp)(pSql->param, pSql, code); - } else { - if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { - tscImportDataFromFile(pSql); - } else { - tscHandleMultivnodeInsert(pSql); - } + } else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert + tscImportDataFromFile(pSql); + } else { // sql string insert + tscHandleMultivnodeInsert(pSql); } } else { if (pSql->retryReason != TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", - pSql->self); + tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self); tscResetSqlCmd(pCmd, false); pSql->retryReason = TSDB_CODE_SUCCESS; } else { @@ -479,7 +401,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { taosReleaseRef(tscObjRef, pSql->self); return; - } +// } } else { // stream computing tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b60a28b9fd8d78310b231d8faf5135a1c3c57dee..dd17e25f58cee6bd2a0ed723a0b1133b22014177 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -390,33 +390,40 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { pSql->cmd.insertParam.schemaAttached = 1; } + // single table query error need to be handled here. if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && - (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure - /*(*/rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || + (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure + rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { - - pSql->retry++; - tscWarn("0x%"PRIx64" it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry); - pSql->res.code = rpcMsg->code; // keep the previous error code - if (pSql->retry > pSql->maxRetry) { - tscError("0x%"PRIx64" max retry %d reached, give up", pSql->self, pSql->maxRetry); - } else { - // wait for a little bit moment and then retry - // todo do not sleep in rpc callback thread, add this process into queue to process - if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - int32_t duration = getWaitingTimeInterval(pSql->retry); - taosMsleep(duration); - } - - pSql->retryReason = rpcMsg->code; - rpcMsg->code = tscRenewTableMeta(pSql, 0); - // if there is an error occurring, proceed to the following error handling procedure. - if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); - return; + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | + TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && + !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { + // do nothing in case of super table subquery + } else { + pSql->retry += 1; + tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry); + + pSql->res.code = rpcMsg->code; // keep the previous error code + if (pSql->retry > pSql->maxRetry) { + tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry); + } else { + // wait for a little bit moment and then retry + // todo do not sleep in rpc callback thread, add this process into queue to process + if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { + int32_t duration = getWaitingTimeInterval(pSql->retry); + taosMsleep(duration); + } + + pSql->retryReason = rpcMsg->code; + rpcMsg->code = tscRenewTableMeta(pSql, 0); + // if there is an error occurring, proceed to the following error handling procedure. + if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, handle); + rpcFreeCont(rpcMsg->pCont); + return; + } } } } @@ -2521,14 +2528,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) { int tscProcessDropTableRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - - //The cached tableMeta is expired in this case, so clean it in hash table - char name[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, name); - - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); - tscDebug("0x%"PRIx64" remove table meta after drop table:%s, numOfRemain:%d", pSql->self, name, (int32_t) taosHashGetSize(tscTableMetaMap)); - + tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); tfree(pTableMetaInfo->pTableMeta); return 0; } @@ -2915,9 +2915,7 @@ static void freeElem(void* p) { * @return status code */ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { - SSqlCmd *pCmd = &pSql->cmd; - - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); char name[TSDB_TABLE_FNAME_LEN] = {0}; @@ -2934,15 +2932,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { } // remove stored tableMeta info in hash table - size_t len = strlen(name); - taosHashRemove(tscTableMetaMap, name, len); - - if (pTableMeta->tableType == TSDB_SUPER_TABLE) { - void* pv = taosCacheAcquireByKey(tscVgroupListBuf, name, len); - if (pv != NULL) { - taosCacheRelease(tscVgroupListBuf, &pv, true); - } - } + tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b98ffd7638606d2b7e21e0782156de931bc65d66..7c10114cfd02d9010b999e665a20bd6b26addbd1 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2704,8 +2704,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tstrerror(pParentSql->res.code)); // release allocated resource - tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, - pState->numOfSub); + tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub); tscFreeRetrieveSup(pSql); @@ -2713,7 +2712,35 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); + + int32_t code = pParentSql->res.code; + if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) { + // remove the cached tableMeta and vgroup id list, and then parse the sql again + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0); + tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); + + tscResetSqlCmd(&pParentSql->cmd, true); + pParentSql->res.code = TSDB_CODE_SUCCESS; + pParentSql->retry++; + + tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, + tstrerror(code), pParentSql->retry); + + code = tsParseSql(pParentSql, true); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return; + } + + if (code != TSDB_CODE_SUCCESS) { + pParentSql->res.code = code; + tscAsyncResultOnError(pParentSql); + return; + } + + executeQuery(pParentSql, pQueryInfo); + } else { + (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); + } } else { // regular super table query if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscAsyncResultOnError(pParentSql); @@ -2996,7 +3023,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(code == taos_errno(pSql)); - if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID)) { tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry); int32_t sent = 0; @@ -3005,7 +3032,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { return; } } else { - tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times, set global code:%s", pParentSql->self, pSql->self, tstrerror(code)); + tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code)); atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort } @@ -3129,8 +3156,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } pParentObj->res.code = TSDB_CODE_SUCCESS; -// pParentObj->cmd.parseFinished = false; - tscResetSqlCmd(&pParentObj->cmd, false); // in case of insert, redo parsing the sql string and build new submit data block for two reasons: diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4454844ea052d8e954ebfc578d9bd29c8c62d02c..ae5a7a69a8ad31895e0f34c28c26a30506c63333 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -4857,3 +4857,19 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { return info; } + +void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) { + char fname[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(&pTableMetaInfo->name, fname); + + int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN); + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + void* pv = taosCacheAcquireByKey(tscVgroupListBuf, fname, len); + if (pv != NULL) { + taosCacheRelease(tscVgroupListBuf, &pv, true); + } + } + + taosHashRemove(tscTableMetaMap, fname, len); + tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap)); +} \ No newline at end of file