提交 733d2216 编写于 作者: D dapan1121

fix stmt bug

上级 a70a942d
...@@ -198,6 +198,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -198,6 +198,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_HANDLE_ERROR(_code) \ #define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB \
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \ #define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
......
...@@ -345,6 +345,10 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) { ...@@ -345,6 +345,10 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
for (int32_t i = 0; i < pRsp->nBlocks; ++i) { for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
SSubmitBlkRsp* blk = pRsp->pBlocks + i; SSubmitBlkRsp* blk = pRsp->pBlocks + i;
if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
continue;
}
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver}; STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
...@@ -499,6 +503,23 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { ...@@ -499,6 +503,23 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
return code; return code;
} }
int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
SCatalog* pCatalog = NULL;
int32_t tbNum = taosArrayGetSize(tbList);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < tbNum; ++i) {
SName* pTbName = taosArrayGet(tbList, i);
catalogRemoveTableMeta(pCatalog, pTbName);
}
return TSDB_CODE_SUCCESS;
}
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t retryNum = 0; int32_t retryNum = 0;
...@@ -518,6 +539,10 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -518,6 +539,10 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
} }
} while (retryNum++ < REQUEST_MAX_TRY_TIMES); } while (retryNum++ < REQUEST_MAX_TRY_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList);
}
return pRequest; return pRequest;
} }
......
...@@ -47,8 +47,14 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) { ...@@ -47,8 +47,14 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
} }
break; break;
case STMT_EXECUTE: case STMT_EXECUTE:
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
code = TSDB_CODE_TSC_STMT_API_ERROR; if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
} else {
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
} }
break; break;
default: default:
......
...@@ -2800,6 +2800,7 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) { ...@@ -2800,6 +2800,7 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true)); CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true));
} }
ctgDebug("table meta %s.%s removed", dbFName, pTableName->tname);
_return: _return:
...@@ -2974,6 +2975,10 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm ...@@ -2974,6 +2975,10 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
int32_t tbNum = taosArrayGetSize(pTables); int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i); STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
if (NULL == pTb->tbFName || 0 == pTb->tbFName[0]) {
continue;
}
tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (CTG_IS_SYS_DBNAME(name.dbname)) { if (CTG_IS_SYS_DBNAME(name.dbname)) {
......
...@@ -2726,6 +2726,7 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm ...@@ -2726,6 +2726,7 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
SName tableName; SName tableName;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pReq->name); tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pReq->name);
collectUseTable(&tableName, pCxt->pTables);
return buildRollupAst(pCxt, pStmt, pReq); return buildRollupAst(pCxt, pStmt, pReq);
} }
......
...@@ -725,7 +725,11 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { ...@@ -725,7 +725,11 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName); if (dbFName[0] && tbName[0]) {
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
} else {
ctx->tbInfo.tbFName[0] = 0;
}
} }
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
......
...@@ -1825,10 +1825,12 @@ int queryColumnTest(TAOS_STMT *stmt, TAOS *taos) { ...@@ -1825,10 +1825,12 @@ int queryColumnTest(TAOS_STMT *stmt, TAOS *taos) {
if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) { if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) {
exit(1); exit(1);
} }
if (taos_stmt_add_batch(stmt)) { if (rand() % 2 == 0) {
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); if (taos_stmt_add_batch(stmt)) {
exit(1); printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
exit(1);
}
} }
if (taos_stmt_execute(stmt) != 0) { if (taos_stmt_execute(stmt) != 0) {
...@@ -1871,10 +1873,12 @@ int queryMiscTest(TAOS_STMT *stmt, TAOS *taos) { ...@@ -1871,10 +1873,12 @@ int queryMiscTest(TAOS_STMT *stmt, TAOS *taos) {
if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) { if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) {
exit(1); exit(1);
} }
if (taos_stmt_add_batch(stmt)) { if (rand() % 2 == 0) {
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); if (taos_stmt_add_batch(stmt)) {
exit(1); printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
exit(1);
}
} }
if (taos_stmt_execute(stmt) != 0) { if (taos_stmt_execute(stmt) != 0) {
......
...@@ -14,7 +14,7 @@ class TDTestCase: ...@@ -14,7 +14,7 @@ class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), logSql)
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册