提交 c85232e1 编写于 作者: D dapan1121

update table meta based on sversion

上级 5827a7e8
...@@ -59,6 +59,11 @@ typedef struct SMetaData { ...@@ -59,6 +59,11 @@ typedef struct SMetaData {
SArray *pQnodeList; // qnode list, SArray<SQueryNodeAddr> SArray *pQnodeList; // qnode list, SArray<SQueryNodeAddr>
} SMetaData; } SMetaData;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
} STbSVersion;
typedef struct SCatalogCfg { typedef struct SCatalogCfg {
uint32_t maxTblCacheNum; uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum; uint32_t maxDBCacheNum;
......
...@@ -83,6 +83,15 @@ void closeTransporter(STscObj *pTscObj) { ...@@ -83,6 +83,15 @@ void closeTransporter(STscObj *pTscObj) {
rpcClose(pTscObj->pAppInfo->pTransporter); rpcClose(pTscObj->pAppInfo->pTransporter);
} }
static bool clientRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
// TODO refactor // TODO refactor
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit; SRpcInit rpcInit;
...@@ -91,6 +100,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -91,6 +100,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.label = "TSC"; rpcInit.label = "TSC";
rpcInit.numOfThreads = numOfThread; rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer; rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user; rpcInit.user = (char *)user;
......
...@@ -310,9 +310,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -310,9 +310,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
} }
} }
if (pRes) { *pRes = res.res;
*pRes = res.res;
}
pRequest->code = res.code; pRequest->code = res.code;
terrno = res.code; terrno = res.code;
...@@ -324,7 +322,49 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) ...@@ -324,7 +322,49 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList)
return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList); return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
} }
int32_t validateSversion(SRequestObj* pRequest, void* res) {
SArray* pArray = NULL;
int32_t code = 0;
if (TDMT_VND_SUBMIT == pRequest->type) {
SSubmitRsp* pRsp = (SSubmitRsp*)res;
if (pRsp->nBlocks <= 0) {
return TSDB_CODE_SUCCESS;
}
pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion));
if (NULL == pArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = pRsp->pBlocks + i;
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver);
}
} else if (TDMT_VND_QUERY == pRequest->type) {
}
SCatalog* pCatalog = NULL;
CHECK_CODE_GOTO(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog), _return);
SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
code = catalogChkTbMetaVersion(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &epset, pArray);
_return:
taosArrayDestroy(pArray);
return code;
}
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) { SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) {
void* pRes = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
switch (pQuery->execMode) { switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL: case QUERY_EXEC_MODE_LOCAL:
...@@ -337,7 +377,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code ...@@ -337,7 +377,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList); code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, res); code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes);
if (NULL != pRes) {
code = validateSversion(pRequest, pRes);
}
} }
taosArrayDestroy(pNodeList); taosArrayDestroy(pNodeList);
break; break;
...@@ -356,6 +399,12 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code ...@@ -356,6 +399,12 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno; pRequest->code = terrno;
freeRequestRes(pRequest, pRes);
pRes = NULL;
}
if (res) {
*res = pRes;
} }
return pRequest; return pRequest;
......
...@@ -2883,6 +2883,11 @@ _return: ...@@ -2883,6 +2883,11 @@ _return:
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) {
}
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) { int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) {
CTG_API_ENTER(); CTG_API_ENTER();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册