未验证 提交 867a0914 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #12800 from taosdata/feature/qnode

feat: drop table meta cache after table dropped
......@@ -1987,19 +1987,16 @@ static FORCE_INLINE void tFreeClientHbReq(void* pReq) {
if (req->info) {
tFreeReqKvHash(req->info);
taosHashCleanup(req->info);
req->info = NULL;
}
}
int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq);
int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq);
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
SClientHbBatchReq* req = (SClientHbBatchReq*)pReq;
if (deep) {
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
} else {
taosArrayDestroy(req->reqs);
}
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
taosMemoryFree(pReq);
}
......@@ -2023,6 +2020,7 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
void tFreeSClientHbBatchRsp(SClientHbBatchRsp *pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
......
......@@ -198,6 +198,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB \
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
......
......@@ -394,6 +394,10 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
......@@ -429,6 +433,10 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
......@@ -463,6 +471,10 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
......@@ -511,16 +523,6 @@ static FORCE_INLINE void hbMgrInitHandle() {
hbMgrInitMqHbHandle();
}
void hbFreeReq(void *req) {
SClientHbReq *pReq = (SClientHbReq *)req;
tFreeReqKvHash(pReq->info);
}
void hbClearClientHbReq(SClientHbReq *pReq) {
pReq->query = NULL;
pReq->info = NULL;
}
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) {
......@@ -535,6 +537,8 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
while (pIter != NULL) {
SClientHbReq *pOneReq = pIter;
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq);
......@@ -544,7 +548,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
}
}
taosArrayPush(pBatchReq->reqs, pOneReq);
//hbClearClientHbReq(pOneReq);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
......@@ -601,8 +604,8 @@ static void *hbThreadFunc(void *param) {
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr);
break;
}
......@@ -611,8 +614,8 @@ static void *hbThreadFunc(void *param) {
if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf);
break;
}
......@@ -628,8 +631,8 @@ static void *hbThreadFunc(void *param) {
int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
}
......@@ -721,8 +724,7 @@ void appHbMgrCleanup(void) {
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq *pOneReq = pIter;
hbFreeReq(pOneReq);
taosHashCleanup(pOneReq->info);
tFreeClientHbReq(pOneReq);
pIter = taosHashIterate(pTarget->activeInfo, pIter);
}
taosHashCleanup(pTarget->activeInfo);
......@@ -782,7 +784,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
}
SClientHbReq hbReq = {0};
hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
//hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
......@@ -823,8 +825,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (pReq) {
hbFreeReq(pReq);
taosHashCleanup(pReq->info);
tFreeClientHbReq(pReq);
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
}
......
......@@ -345,6 +345,10 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
SSubmitBlkRsp* blk = pRsp->pBlocks + i;
if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
continue;
}
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver);
}
......@@ -383,7 +387,7 @@ _return:
}
void freeRequestRes(SRequestObj* pRequest, void* res) {
if (NULL == res) {
if (NULL == pRequest || NULL == res) {
return;
}
......@@ -431,12 +435,13 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
freeRequestRes(pRequest, pRes);
pRes = NULL;
}
if (res) {
*res = pRes;
} else {
freeRequestRes(pRequest, pRes);
pRes = NULL;
}
return pRequest;
......@@ -499,6 +504,23 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
return code;
}
int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
SCatalog* pCatalog = NULL;
int32_t tbNum = taosArrayGetSize(tbList);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < tbNum; ++i) {
SName* pTbName = taosArrayGet(tbList, i);
catalogRemoveTableMeta(pCatalog, pTbName);
}
return TSDB_CODE_SUCCESS;
}
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
......@@ -518,6 +540,10 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
}
} while (retryNum++ < REQUEST_MAX_TRY_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList);
}
return pRequest;
}
......
......@@ -47,8 +47,14 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
}
break;
case STMT_EXECUTE:
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
} else {
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
}
break;
default:
......@@ -794,6 +800,7 @@ int stmtExec(TAOS_STMT* stmt) {
if (code) {
pStmt->exec.pRequest->code = code;
} else {
tFreeSSubmitRsp(pRsp);
STMT_ERR_RET(stmtResetStmt(pStmt));
STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
}
......@@ -811,11 +818,13 @@ _return:
if (TSDB_CODE_SUCCESS == code && autoCreateTbl) {
if (NULL == pRsp) {
tscError("no submit resp got for auto create table");
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
code = TSDB_CODE_TSC_APP_ERROR;
} else {
code = stmtUpdateTableUid(pStmt, pRsp);
}
STMT_ERR_RET(stmtUpdateTableUid(pStmt, pRsp));
}
tFreeSSubmitRsp(pRsp);
++pStmt->sql.runTimes;
......
......@@ -128,7 +128,8 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
}
static void mndFreeConn(SConnObj *pConn) {
taosMemoryFreeClear(pConn->pQueries);
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
}
......@@ -396,6 +397,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
if (NULL == hbRsp.info) {
mError("taosArrayInit %d rsp kv failed", kvNum);
terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeClientHbRsp(&hbRsp);
return -1;
}
......@@ -453,6 +455,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
SClientHbBatchReq batchReq = {0};
if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -479,18 +482,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
void *buf = rpcMallocCont(tlen);
tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
for (int32_t n = 0; n < kvNum; ++n) {
SKv *kv = taosArrayGet(rsp->info, n);
taosMemoryFreeClear(kv->value);
}
taosArrayDestroy(rsp->info);
}
taosArrayDestroy(batchRsp.rsps);
tFreeClientHbBatchRsp(&batchRsp);
pReq->info.rspLen = tlen;
pReq->info.rsp = buf;
......
......@@ -1642,6 +1642,8 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
if (pStbVersion->sversion != metaRsp.sversion) {
taosArrayPush(batchMetaRsp.pArray, &metaRsp);
} else {
tFreeSTableMetaRsp(&metaRsp);
}
}
......@@ -1660,6 +1662,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
}
tSerializeSTableMetaBatchRsp(pRsp, rspLen, &batchMetaRsp);
tFreeSTableMetaBatchRsp(&batchMetaRsp);
*ppRsp = pRsp;
*pRspLen = rspLen;
return 0;
......
......@@ -2784,6 +2784,8 @@ int32_t catalogRemoveTableMeta(SCatalog *pCtg, const SName *pTableName) {
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true));
}
ctgDebug("table meta %s.%s removed", dbFName, pTableName->tname);
_return:
taosMemoryFreeClear(tblMeta);
......@@ -2958,7 +2960,11 @@ int32_t catalogChkTbMetaVersion(SCatalog *pCtg, void *pTrans, const SEpSet *pMgm
int32_t sver = 0;
int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion *pTb = (STbSVersion *)taosArrayGet(pTables, i);
STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
if (NULL == pTb->tbFName || 0 == pTb->tbFName[0]) {
continue;
}
tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (CTG_IS_SYS_DBNAME(name.dbname)) {
......
......@@ -1085,6 +1085,10 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// no data in the sql string anymore.
if (sToken.n == 0) {
if (sToken.type && pCxt->pSql[0]) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
}
if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
}
......
......@@ -704,6 +704,7 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
if (t0.type == TK_NK_SEMI) {
t0.n = 0;
t0.type = 0;
return t0;
}
......
......@@ -725,7 +725,11 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
if (dbFName[0] && tbName[0]) {
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
} else {
ctx->tbInfo.tbFName[0] = 0;
}
}
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
......
......@@ -1825,10 +1825,12 @@ int queryColumnTest(TAOS_STMT *stmt, TAOS *taos) {
if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) {
exit(1);
}
if (taos_stmt_add_batch(stmt)) {
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
exit(1);
if (rand() % 2 == 0) {
if (taos_stmt_add_batch(stmt)) {
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
exit(1);
}
}
if (taos_stmt_execute(stmt) != 0) {
......@@ -1871,10 +1873,12 @@ int queryMiscTest(TAOS_STMT *stmt, TAOS *taos) {
if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) {
exit(1);
}
if (taos_stmt_add_batch(stmt)) {
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
exit(1);
if (rand() % 2 == 0) {
if (taos_stmt_add_batch(stmt)) {
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
exit(1);
}
}
if (taos_stmt_execute(stmt) != 0) {
......
......@@ -28,15 +28,17 @@ if $rows != 1 then
return -1
endi
print =============== step2
sql drop table $mt
sql show stables
if $rows != 0 then
return -1
endi
#TODO OPEN THIS WHEN STABLE DELETE WORKS
#print =============== step2
#sql drop table $mt
#sql show stables
#if $rows != 0 then
# return -1
#endi
print =============== step3
sql create table $mt (ts timestamp, speed int) TAGS(sp int)
#print =============== step3
#sql create table $mt (ts timestamp, speed int) TAGS(sp int)
#TODO OPEN THIS WHEN STABLE DELETE WORKS
sql show stables
if $rows != 1 then
......@@ -134,4 +136,4 @@ if $rows != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -14,7 +14,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql)
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册