提交 1b196df8 编写于 作者: D dapan1121

update epset

上级 001061fc
......@@ -159,7 +159,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList);
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, SRequestConnInfo* pConn, const char* pDBName, SArray** pVgroupList);
int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
......@@ -178,7 +178,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta);
/**
* Get a super table's meta data.
......@@ -189,7 +189,7 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSe
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
......@@ -202,9 +202,9 @@ int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
* @param dbFName (input, db full name)
* @return error code
*/
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName);
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName);
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables);
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* pTables);
/**
* Force refresh a table's local cached meta data.
......@@ -215,7 +215,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
int32_t catalogRefreshTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, int32_t isSTable);
/**
* Force refresh a table's local cached meta data and get the new one.
......@@ -227,7 +227,7 @@ int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SE
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
......@@ -240,7 +240,7 @@ int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList);
int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, SArray** pVgroupList);
/**
* Get a table's vgroup from its name's hash value.
......@@ -251,7 +251,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const
* @param vgInfo (output, vgroup info)
* @return error code
*/
int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo);
int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pName, SVgroupInfo* vgInfo);
/**
......@@ -263,11 +263,11 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const
* @param pRsp (output, response data)
* @return error code
*/
int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetAllMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
......@@ -275,21 +275,21 @@ int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *n
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_t *num);
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
int32_t catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* pDbCfg);
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes);
int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, bool forceUpdate);
/**
......
......@@ -35,6 +35,7 @@ typedef struct SStmtCallback {
typedef struct SParseContext {
uint64_t requestId;
int64_t requestRid;
int32_t acctId;
const char* db;
bool topicQuery;
......
......@@ -139,13 +139,20 @@ typedef struct SDataBuf {
typedef struct STargetInfo {
ETargetType type;
char dbFName[TSDB_DB_FNAME_LEN]; // used to update db's vgroup epset
char* dbFName; // used to update db's vgroup epset
int32_t vgId;
} STargetInfo;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SRequestConnInfo {
void* pTrans;
uint64_t requestId;
int64_t requestObjRefId;
SEpSet mgmtEps;
} SRequestConnInfo;
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; // async callback function
STargetInfo target; // for update epset
......
......@@ -72,6 +72,16 @@ typedef struct SSchdFetchParam {
typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code);
typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code);
typedef struct SSchedulerReq {
SRequestConnInfo *pConn;
SArray *pNodeList;
SQueryPlan *pDag;
const char *sql;
int64_t startTs;
schedulerExecCallback fp;
void* cbParam;
} SSchedulerReq;
int32_t schedulerInit(SSchedulerCfg *cfg);
......@@ -81,7 +91,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes);
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......@@ -89,8 +99,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, schedulerExecCallback fp, void* param);
int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
/**
* Fetch query result from the remote query executor
......
......@@ -142,6 +142,13 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
} else {
if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) {
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb",
pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port,
pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn, pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
}
pTscObj->connId = pRsp->query->connId;
......
......@@ -166,6 +166,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {.requestId = pRequest->requestId,
.requestRid = pRequest->self,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = topicQuery,
......@@ -318,12 +319,15 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
taosThreadMutexUnlock(&pInfo->qnodeMutex);
if (NULL == *pNodeList) {
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) {
*pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, *pNodeList);
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
}
if (TSDB_CODE_SUCCESS == code && *pNodeList) {
......@@ -395,13 +399,23 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
}
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
tsem_init(&schdRspSem, 0, 0);
SQueryResult res = {.code = 0, .numOfRows = 0};
int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, schdExecCallback, &res);
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self
};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.fp = schdExecCallback,
.cbParam = &res
};
int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
pRequest->body.resInfo.execRes = res.res;
......@@ -442,9 +456,20 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {0};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res);
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self
};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.fp = NULL,
.cbParam = NULL
};
int32_t code = schedulerExecJob(&req,&pRequest->body.queryJob, &res);
pRequest->body.resInfo.execRes = res.res;
if (code != TSDB_CODE_SUCCESS) {
......@@ -494,7 +519,12 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog
taosArrayPush(pArray, &tbSver);
}
code = catalogChkTbMetaVersion(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, epset, pArray);
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = *epset};
code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
_return:
......@@ -523,7 +553,12 @@ int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog,
taosArrayPush(pArray, &tbSver);
}
code = catalogChkTbMetaVersion(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, epset, pArray);
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = *epset};
code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
_return:
......@@ -695,8 +730,19 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
}
if (TSDB_CODE_SUCCESS == code) {
schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob,
pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest);
SRequestConnInfo conn = {.pTrans = pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self
};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pRequest->body.pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.fp = schedulerExecCb,
.cbParam = pRequest
};
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
} else {
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
......@@ -737,12 +783,15 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
return code;
}
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
for (int32_t i = 0; i < dbNum; ++i) {
char* dbFName = taosArrayGet(pRequest->dbList, i);
code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -751,7 +800,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
for (int32_t i = 0; i < tblNum; ++i) {
SName* tableName = taosArrayGet(pRequest->tableList, i);
code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -924,6 +973,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->target.dbFName);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
taosMemoryFreeClear(pMsgBody);
}
......@@ -936,15 +986,23 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
switch (pSendInfo->target.type) {
case TARGET_TYPE_MNODE:
if (NULL == pTscObj) {
tscError("mnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId);
tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
return;
}
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &pEpSet->eps[pEpSet->inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client",
pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port,
pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
break;
case TARGET_TYPE_VNODE: {
if (NULL == pTscObj) {
tscError("vnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId);
tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
return;
}
......@@ -957,6 +1015,7 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
}
catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
taosMemoryFreeClear(pSendInfo->target.dbFName);
break;
}
default:
......@@ -970,26 +1029,30 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
assert(pMsg->info.ahandle != NULL);
STscObj* pTscObj = NULL;
tscDebug("processMsgFromServer message: %s, code: %s", TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code));
if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
assert(pRequest->self == pSendInfo->requestObjRefId);
pRequest->metric.rsp = taosGetTimestampUs();
pTscObj = pRequest->pTscObj;
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
}
if (pRequest) {
assert(pRequest->self == pSendInfo->requestObjRefId);
pRequest->metric.rsp = taosGetTimestampUs();
pTscObj = pRequest->pTscObj;
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
}
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
}
}
updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
......
......@@ -687,19 +687,20 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
}
**pCxt = (SParseContext){.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = false,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,};
.requestRid = pRequest->self,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = false,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,};
return TSDB_CODE_SUCCESS;
}
......@@ -743,7 +744,12 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
pWrapper->pRequest = pRequest;
pWrapper->catalogReq = catalogReq;
code = catalogAsyncGetAllMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pRequest->requestId,
SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
.requestId = pCxt->requestId,
.requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet};
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, pRequest->requestId,
&catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob);
if (code == TSDB_CODE_SUCCESS) {
return;
......
......@@ -66,6 +66,12 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
dstEpSet.eps[dstEpSet.inUse].fqdn);
} else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp",
pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port,
connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn, pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
}
......@@ -102,6 +108,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->msgType = pRequest->type;
pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
assert(pRequest != NULL);
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
......
......@@ -467,15 +467,18 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
SSmlSTableMeta* sTableData = *tableMetaSml;
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
size_t superTableLen = 0;
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
memcpy(pName.tname, superTable, superTableLen);
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
SRequestConnInfo conn = {.pTrans = info->taos->pAppInfo->pTransporter,
.requestId = info->pRequest->requestId,
.requestObjRefId = info->pRequest->self,
.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp)};
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) {
SSchemaAction schemaAction;
......@@ -515,7 +518,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
return code;
}
code = catalogRefreshTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, -1);
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -525,7 +528,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
}
if(pTableMeta) taosMemoryFree(pTableMeta);
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable);
return code;
......@@ -2210,9 +2213,12 @@ static int32_t smlInsertData(SSmlHandle* info) {
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SRequestConnInfo conn = {.pTrans = info->taos->pAppInfo->pTransporter,
.requestId = info->pRequest->requestId,
.requestObjRefId = info->pRequest->self,
.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp)};
SVgroupInfo vg;
code = catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg);
code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
return code;
......@@ -2324,9 +2330,12 @@ static int32_t isSchemalessDb(SSmlHandle* info){
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
SDbCfgInfo pInfo = {0};
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SRequestConnInfo conn = {.pTrans = info->taos->pAppInfo->pTransporter,
.requestId = info->pRequest->requestId,
.requestObjRefId = info->pRequest->self,
.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp)};
int32_t code = catalogGetDBCfg(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, dbFname, &pInfo);
int32_t code = catalogGetDBCfg(info->pCatalog, &conn, dbFname, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
info->pRequest->code = code;
smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code));
......
......@@ -326,9 +326,12 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) {
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
SVgroupInfo vgInfo = {0};
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
.requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname,
&vgInfo));
STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo));
STMT_ERR_RET(
taosHashPut(pStmt->exec.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
......@@ -389,9 +392,12 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
}
STableMeta* pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
.requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
int32_t code =
catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta);
catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
......
......@@ -218,13 +218,10 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
break;
}
// TODO OPEN THIS !!!!!!!
#if 0
if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) {
if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) {
terrno = TSDB_CODE_VND_HASH_MISMATCH;
return TSDB_CODE_VND_HASH_MISMATCH;
}
#endif
return 0;
}
......@@ -112,6 +112,10 @@ _exit:
rpcMsg.contLen = rspLen;
rpcMsg.code = code;
if (code) {
qError("get table %s meta failed cause of %s", infoReq.tbName, tstrerror(code));
}
tmsgSendRsp(&rpcMsg);
taosMemoryFree(metaRsp.pSchemas);
......
......@@ -90,6 +90,7 @@ typedef struct SCtgTbCacheInfo {
typedef struct SCtgTbMetaCtx {
SCtgTbCacheInfo tbInfo;
int32_t vgId;
SName* pName;
int32_t flag;
} SCtgTbMetaCtx;
......@@ -174,27 +175,26 @@ typedef struct SCatalog {
} SCatalog;
typedef struct SCtgJob {
int64_t refId;
SArray* pTasks;
int32_t taskDone;
SMetaData jobRes;
uint64_t queryId;
SCatalog* pCtg;
void* pTrans;
SEpSet pMgmtEps;
void* userParam;
catalogCallback userFp;
int32_t tbMetaNum;
int32_t tbHashNum;
int32_t dbVgNum;
int32_t udfNum;
int32_t qnodeNum;
int32_t dbCfgNum;
int32_t indexNum;
int32_t userNum;
int32_t dbInfoNum;
int32_t tbIndexNum;
int64_t refId;
SArray* pTasks;
int32_t taskDone;
SMetaData jobRes;
uint64_t queryId;
SCatalog* pCtg;
SRequestConnInfo conn;
void* userParam;
catalogCallback userFp;
int32_t tbMetaNum;
int32_t tbHashNum;
int32_t dbVgNum;
int32_t udfNum;
int32_t qnodeNum;
int32_t dbCfgNum;
int32_t indexNum;
int32_t userNum;
int32_t dbInfoNum;
int32_t tbIndexNum;
} SCtgJob;
typedef struct SCtgMsgCtx {
......@@ -448,15 +448,12 @@ typedef struct SCtgOperation {
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_PARAMS SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
#define CTG_PARAMS_LIST() pCtg, pTrans, pMgmtEps
void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
void ctgdShowClusterCache(SCatalog* pCtg);
int32_t ctgdShowCacheInfo(void);
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
......@@ -492,18 +489,18 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask);
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName* name, SArray** out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnode(CTG_PARAMS, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName* name, SArray** out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
......@@ -523,6 +520,7 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
char *ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
extern SCatalogMgmt gCtgMgmt;
......
此差异已折叠。
......@@ -158,7 +158,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS;
}
......@@ -181,7 +181,7 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS;
}
......@@ -204,7 +204,7 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS;
}
......@@ -227,7 +227,7 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user);
return TSDB_CODE_SUCCESS;
}
......@@ -255,7 +255,7 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -347,7 +347,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* p
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
int32_t code = 0;
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
......@@ -377,8 +377,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
pJob->queryId = reqId;
pJob->userFp = fp;
pJob->pCtg = pCtg;
pJob->pTrans = pTrans;
pJob->pMgmtEps = *pMgmtEps;
pJob->conn = *pConn;
pJob->userParam = param;
pJob->tbMetaNum = tbMetaNum;
......@@ -638,7 +637,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
_return:
qDebug("QID:%" PRIx64 " user callback with rsp %s", pJob->queryId, tstrerror(code));
qDebug("QID:0x%" PRIx64 " user callback with rsp %s", pJob->queryId, tstrerror(code));
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, code);
......@@ -650,12 +649,11 @@ _return:
int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
switch (reqType) {
case TDMT_MND_USE_DB: {
......@@ -666,7 +664,8 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgInfo, NULL, pTask));
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
......@@ -684,8 +683,9 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgInfo, NULL, pTask));
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
......@@ -695,7 +695,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
tstrncpy(input.db, dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pTrans, pMgmtEps, &input, NULL, pTask));
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask));
}
return TSDB_CODE_SUCCESS;
......@@ -733,7 +733,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
taosMemoryFreeClear(pOut->tbMeta);
CTG_RET(ctgGetTbMetaFromMnode(CTG_PARAMS_LIST(), ctx->pName, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, ctx->pName, NULL, pTask));
} else if (CTG_IS_META_BOTH(pOut->metaType)) {
int32_t exist = 0;
if (!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) {
......@@ -742,7 +742,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
if (0 == exist) {
TSWAP(pTask->msgCtx.lastOut, pTask->msgCtx.out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), pOut->dbFName, pOut->tbName, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, pTask));
} else {
taosMemoryFreeClear(pOut->tbMeta);
......@@ -797,12 +797,10 @@ _return:
int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
switch (reqType) {
case TDMT_MND_USE_DB: {
......@@ -831,12 +829,10 @@ _return:
int32_t ctgHandleGetTbHashRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
switch (reqType) {
case TDMT_MND_USE_DB: {
......@@ -942,15 +938,13 @@ _return:
int32_t ctgHandleGetUserRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
bool pass = false;
SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
if (pOut->superAuth) {
pass = true;
goto _return;
......@@ -988,22 +982,21 @@ _return:
int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
int32_t code = 0;
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(ctx->pName));
CTG_RET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), (char *)ctx->pName->dbname, (char *)ctx->pName->tname, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)ctx->pName->dbname, (char *)ctx->pName->tname, NULL, pTask));
}
if (CTG_FLAG_IS_STB(ctx->flag)) {
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(ctx->pName));
// if get from mnode failed, will not try vnode
CTG_RET(ctgGetTbMetaFromMnode(CTG_PARAMS_LIST(), ctx->pName, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, ctx->pName, NULL, pTask));
}
SCtgDBCache *dbCache = NULL;
......@@ -1017,14 +1010,15 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgInfo, NULL, pTask));
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
} else {
SBuildUseDBInput input = {0};
tstrncpy(input.db, dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pTrans, pMgmtEps, &input, NULL, pTask));
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask));
}
_return:
......@@ -1039,10 +1033,9 @@ _return:
int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_RET(ctgGetTbMetaFromCache(CTG_PARAMS_LIST(), (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res));
CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res));
if (pTask->res) {
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
......@@ -1056,8 +1049,7 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL;
SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
......@@ -1072,7 +1064,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
tstrncpy(input.db, pCtx->dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pTrans, pMgmtEps, &input, NULL, pTask));
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask));
}
_return:
......@@ -1088,8 +1080,7 @@ _return:
int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL;
SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
......@@ -1108,7 +1099,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
tstrncpy(input.db, pCtx->dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pTrans, pMgmtEps, &input, NULL, pTask));
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask));
}
_return:
......@@ -1124,31 +1115,28 @@ _return:
int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), pCtx->pName, NULL, pTask));
CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_RET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), NULL, pTask));
CTG_ERR_RET(ctgGetQnodeListFromMnode(pCtg, pConn, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetDBCfgFromMnode(CTG_PARAMS_LIST(), pCtx->dbFName, NULL, pTask));
CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
......@@ -1156,8 +1144,6 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgDBCache *dbCache = NULL;
SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
......@@ -1190,30 +1176,27 @@ _return:
int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), pCtx->indexFName, NULL, pTask));
CTG_ERR_RET(ctgGetIndexInfoFromMnode(pCtg, pConn, pCtx->indexFName, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetUdfInfoFromMnode(CTG_PARAMS_LIST(), pCtx->udfName, NULL, pTask));
CTG_ERR_RET(ctgGetUdfInfoFromMnode(pCtg, pConn, pCtx->udfName, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx;
bool inCache = false;
bool pass = false;
......@@ -1230,7 +1213,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgGetUserDbAuthFromMnode(CTG_PARAMS_LIST(), pCtx->user.user, NULL, pTask));
CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pCtx->user.user, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
......
......@@ -1591,9 +1591,13 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
goto _return;
}
pInfo->epSet = msg->epSet;
SEp* pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
SEp* pNewEp = &msg->epSet.eps[msg->epSet.inUse];
ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg",
pInfo->vgId, pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port,
msg->epSet.inUse, msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
ctgDebug("epset in vgroup %d updated, dbFName:%s", pInfo->vgId, msg->dbFName);
pInfo->epSet = msg->epSet;
ctgWReleaseVgInfo(dbCache);
......
......@@ -19,7 +19,7 @@
#include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {0};
SCtgDebug gCTGDebug = {.apiEnable = true};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);
......@@ -147,7 +147,7 @@ grant write on db2.* to user1;
create function udf1 as '/tmp/libudf1.so' outputtype int;
create aggregate function udf2 as '/tmp/libudf2.so' outputtype int;
*/
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) {
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, bool forceUpdate) {
int32_t code = 0;
SCatalogReq req = {0};
req.pTableMeta = taosArrayInit(2, sizeof(SName));
......@@ -209,7 +209,8 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
*param = 1;
int64_t jobId = 0;
CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pTrans, pMgmtEps, reqId, &req, ctgdUserCallback, param, &jobId));
CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pConn, reqId, &req, ctgdUserCallback, param, &jobId));
_return:
......
......@@ -242,18 +242,22 @@ _return:
CTG_RET(code);
}
int32_t ctgAsyncSendMsg(CTG_PARAMS, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0;
SMsgSendInfo *pMsgSendInfo = NULL;
CTG_ERR_JRET(ctgMakeMsgSendInfo(pTask, msgType, &pMsgSendInfo));
ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, pTask);
pMsgSendInfo->requestId = pConn->requestId;
pMsgSendInfo->requestObjRefId = pConn->requestObjRefId;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = NULL;
pMsgSendInfo->msgType = msgType;
int64_t transporterId = 0;
code = asyncSendMsgToServer(pTrans, (SEpSet*)pMgmtEps, &transporterId, pMsgSendInfo);
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
if (code) {
ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
CTG_ERR_JRET(code);
......@@ -275,13 +279,13 @@ _return:
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pMgmtEps->inUse);
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
if (code) {
......@@ -295,7 +299,7 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -305,7 +309,7 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
......@@ -313,7 +317,7 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
}
int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) {
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB;
......@@ -334,7 +338,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -344,14 +348,14 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) {
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG;
......@@ -372,7 +376,7 @@ int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, S
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -382,14 +386,14 @@ int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, S
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask) {
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX;
......@@ -410,7 +414,7 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -420,14 +424,14 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName *name, SArray** out, SCtgTask* pTask) {
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, SArray** out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
......@@ -450,7 +454,7 @@ int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName *name, SArray** out, SCtgTask*
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -460,14 +464,14 @@ int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName *name, SArray** out, SCtgTask*
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask) {
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
......@@ -488,7 +492,7 @@ int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out,
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -498,14 +502,14 @@ int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) {
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH;
......@@ -526,7 +530,7 @@ int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -536,7 +540,7 @@ int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
......@@ -544,7 +548,7 @@ int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp
}
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) {
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) {
SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
......@@ -569,7 +573,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STabl
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -579,21 +583,21 @@ int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STabl
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbMetaFromMnode(CTG_PARAMS, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) {
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
return ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), dbFName, (char *)pTableName->tname, out, pTask);
return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char *)pTableName->tname, out, pTask);
}
int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) {
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
int32_t reqType = TDMT_VND_TABLE_META;
......@@ -619,8 +623,12 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
CTG_RET(ctgAsyncSendMsg(pCtg, pTrans, &vgroupInfo->epSet, pTask, reqType, msg, msgLen));
SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
.requestId = pConn->requestId,
.requestObjRefId = pConn->requestObjRefId,
.mgmtEps = vgroupInfo->epSet};
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
......@@ -630,7 +638,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
......
......@@ -640,5 +640,20 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) {
if (msgType == TDMT_VND_TABLE_META) {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(ctx->pName, dbFName);
pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
pMsgSendInfo->target.vgId = ctx->vgId;
pMsgSendInfo->target.dbFName = strdup(dbFName);
} else {
pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
}
return TSDB_CODE_SUCCESS;
}
......@@ -4454,10 +4454,14 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, uid);
int32_t code = metaGetTableEntryByUid(&mr, uid);
if (code) {
metaReaderClear(&mr);
return code;
}
pTaskInfo->schemaVer.tablename = strdup(mr.me.name);
......@@ -4474,6 +4478,8 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
......@@ -4490,7 +4496,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
tsdbCleanupReadHandle(pDataReader);
return NULL;
}
SOperatorInfo* pOperator =
createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo);
......
......@@ -39,8 +39,12 @@ static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, AUTH_TYPE type) {
if (NULL != pCxt->pMetaCache) {
code = getUserAuthFromCache(pCxt->pMetaCache, pParseCxt->pUser, dbFname, type, &pass);
} else {
code = catalogChkAuth(pParseCxt->pCatalog, pParseCxt->pTransporter, &pParseCxt->mgmtEpSet, pParseCxt->pUser,
dbFname, type, &pass);
SRequestConnInfo conn = {.pTrans = pParseCxt->pTransporter,
.requestId = pParseCxt->requestId,
.requestObjRefId = pParseCxt->requestRid,
.mgmtEps = pParseCxt->mgmtEpSet};
code = catalogChkAuth(pParseCxt->pCatalog, &conn, pParseCxt->pUser, dbFname, type, &pass);
}
return TSDB_CODE_SUCCESS == code ? (pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code;
}
......
......@@ -257,8 +257,12 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass)
if (pBasicCtx->async) {
return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
}
return catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, pDbFname,
AUTH_TYPE_WRITE, pPass);
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
.requestObjRefId = pBasicCtx->requestRid,
.mgmtEps = pBasicCtx->mgmtEpSet};
return catalogChkAuth(pBasicCtx->pCatalog, &conn, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
}
static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
......@@ -266,11 +270,15 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is
if (pBasicCtx->async) {
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
}
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
.requestObjRefId = pBasicCtx->requestRid,
.mgmtEps = pBasicCtx->mgmtEpSet};
if (isStb) {
return catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName,
pTableMeta);
return catalogGetSTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
}
return catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pTableMeta);
return catalogGetTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta);
}
static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
......@@ -278,7 +286,11 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroup
if (pBasicCtx->async) {
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
}
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pVg);
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
.requestObjRefId = pBasicCtx->requestRid,
.mgmtEps = pBasicCtx->mgmtEpSet};
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg);
}
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
......@@ -310,7 +322,11 @@ static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgI
if (pBasicCtx->async) {
CHECK_CODE(getDbCfgFromCache(pCxt->pMetaCache, pDbFName, pInfo));
} else {
CHECK_CODE(catalogGetDBCfg(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pDbFName, pInfo));
SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter,
.requestId = pBasicCtx->requestId,
.requestObjRefId = pBasicCtx->requestRid,
.mgmtEps = pBasicCtx->mgmtEpSet};
CHECK_CODE(catalogGetDBCfg(pBasicCtx->pCatalog, &conn, pDbFName, pInfo));
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -111,7 +111,11 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
if (pParCxt->async) {
code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta);
} else {
code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta);
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetTableMeta(pParCxt->pCatalog, &conn, pName, pMeta);
}
}
if (TSDB_CODE_SUCCESS != code) {
......@@ -135,8 +139,13 @@ static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName,
if (pParCxt->async) {
code = getTableMetaFromCache(pCxt->pMetaCache, &name, pMeta);
} else {
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code =
catalogRefreshGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pMeta, false);
catalogRefreshGetTableMeta(pParCxt->pCatalog, &conn, &name, pMeta, false);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName,
......@@ -154,7 +163,11 @@ static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArr
if (pParCxt->async) {
code = getDbVgInfoFromCache(pCxt->pMetaCache, fullDbName, pVgInfo);
} else {
code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo);
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetDBVgInfo(pParCxt->pCatalog, &conn, fullDbName, pVgInfo);
}
}
if (TSDB_CODE_SUCCESS != code) {
......@@ -181,7 +194,11 @@ static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pNam
if (pParCxt->async) {
code = getTableVgroupFromCache(pCxt->pMetaCache, pName, pInfo);
} else {
code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo);
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetTableHashVgroup(pParCxt->pCatalog, &conn, pName, pInfo);
}
}
if (TSDB_CODE_SUCCESS != code) {
......@@ -225,7 +242,12 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo
if (pParCxt->async) {
code = getDbCfgFromCache(pCxt->pMetaCache, dbFname, pInfo);
} else {
code = catalogGetDBCfg(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, dbFname, pInfo);
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetDBCfg(pParCxt->pCatalog, &conn, dbFname, pInfo);
}
}
if (TSDB_CODE_SUCCESS != code) {
......@@ -241,7 +263,12 @@ static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (pParCxt->async) {
code = getUdfInfoFromCache(pCxt->pMetaCache, pFunc->functionName, &funcInfo);
} else {
code = catalogGetUdfInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pFunc->functionName,
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetUdfInfo(pParCxt->pCatalog, &conn, pFunc->functionName,
&funcInfo);
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -167,17 +167,17 @@ void generateFunctions(MockCatalogService* mcs) {
int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandle) { return 0; }
int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, void* pRpc, const SEpSet* pMgmtEps, const SName* pTableName,
int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, SRequestConnInfo *pConn, const SName* pTableName,
STableMeta** pTableMeta) {
return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta);
}
int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, void* pRpc, const SEpSet* pMgmtEps,
int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, SRequestConnInfo *pConn,
const SName* pTableName, SVgroupInfo* vgInfo) {
return g_mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo);
}
int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const SName* pTableName,
int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName,
SArray** pVgList) {
return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList);
}
......@@ -187,27 +187,27 @@ int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* ve
return 0;
}
int32_t __catalogGetDBVgInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName,
int32_t __catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* dbFName,
SArray** pVgList) {
return g_mockCatalogService->catalogGetDBVgInfo(dbFName, pVgList);
}
int32_t __catalogGetDBCfg(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
int32_t __catalogGetDBCfg(SCatalog* pCtg, SRequestConnInfo *pConn, const char* dbFName, SDbCfgInfo* pDbCfg) {
return 0;
}
int32_t __catalogChkAuth(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName,
int32_t __catalogChkAuth(SCatalog* pCtg, SRequestConnInfo *pConn, const char* user, const char* dbFName,
AUTH_TYPE type, bool* pass) {
*pass = true;
return 0;
}
int32_t __catalogGetUdfInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* funcName,
int32_t __catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* funcName,
SFuncInfo* pInfo) {
return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo);
}
int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps,
int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, SRequestConnInfo *pConn,
const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta);
}
......
......@@ -551,7 +551,7 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) {
if (taosHashGetSize(pSch->tasksHash) <= 0) {
qwDestroySchStatus(pSch);
taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qError("sch %" PRIx64 "destroyed", *sId);
qDebug("sch %" PRIx64 " destroyed", *sId);
}
qwReleaseScheduler(QW_WRITE, mgmt);
......
......@@ -199,7 +199,7 @@ typedef struct SSchJob {
SSchJobAttr attr;
int32_t levelNum;
int32_t taskNum;
void *pTrans;
SRequestConnInfo conn;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeAddr>
SArray *levels; // starting from 0. SArray<SSchLevel>
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
......@@ -339,7 +339,6 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
int32_t schCloneSMsgSendInfo(void *src, void **dst);
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void schFreeJobImpl(void *job);
int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam);
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
......@@ -355,21 +354,20 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx);
int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, bool sync);
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, int64_t startTs, bool sync);
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bool sync);
int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
void schCloseJobRef(void);
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
#ifdef __cplusplus
......
......@@ -42,59 +42,58 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
return TSDB_CODE_SUCCESS;
}
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pNodeList, const char *sql,
SSchResInfo *pRes, int64_t startTs, bool syncSchedule) {
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, bool syncSchedule) {
int32_t code = 0;
int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
qError("QID:%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->attr.syncSchedule = syncSchedule;
pJob->pTrans = pTrans;
pJob->sql = sql;
if (pRes) {
pJob->userRes = *pRes;
}
pJob->conn = *pReq->pConn;
pJob->sql = pReq->sql;
pJob->userRes.queryRes = pRes;
pJob->userRes.execFp = pReq->fp;
pJob->userRes.userParam = pReq->cbParam;
if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList);
if (pReq->pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pReq->pNodeList);
}
pJob->taskList =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pDag->numOfSubplans);
SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
}
pJob->execTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->succTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->succTasks) {
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->failTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->failTasks) {
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1477,17 +1476,16 @@ void schFreeJobImpl(void *job) {
}
}
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, int64_t startTs, bool sync) {
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bool sync) {
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
}
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync));
SCH_ERR_RET(schInitJob(pReq, &pJob, pRes, sync));
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pDag->queryId, pJob->refId);
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
*job = pJob->refId;
SCH_ERR_JRET(schLaunchJob(pJob));
......@@ -1508,66 +1506,62 @@ _return:
SCH_RET(code);
}
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes) {
int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes) {
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_JRET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, NULL, true));
if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) {
SCH_ERR_JRET(schExecStaticExplainJob(pReq, pJob, true));
} else {
SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, NULL, startTs, true));
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, pRes, true));
}
_return:
if (*pJob) {
SSchJob *job = schAcquireJob(*pJob);
schSetJobQueryRes(job, pRes->queryRes);
schSetJobQueryRes(job, pRes);
schReleaseJob(*pJob);
}
return code;
}
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes) {
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob) {
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, false));
if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplainJob(pReq, pJob, false));
} else {
SCH_ERR_RET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, false));
SCH_ERR_RET(schExecJobImpl(pReq, pJob, NULL, false));
}
return code;
}
int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, bool sync) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
qDebug("QID:0x%" PRIx64 " job started", pReq->pDag->queryId);
int32_t code = 0;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
qError("QID:%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->sql = sql;
pJob->sql = pReq->sql;
pJob->attr.queryJob = true;
pJob->attr.syncSchedule = sync;
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->queryId = pDag->queryId;
pJob->subPlans = pDag->pSubplans;
if (pRes) {
pJob->userRes = *pRes;
}
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->queryId = pReq->pDag->queryId;
pJob->subPlans = pReq->pDag->pSubplans;
pJob->userRes.execFp = pReq->fp;
pJob->userRes.userParam = pReq->cbParam;
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
SCH_ERR_JRET(qExecStaticExplain(pReq->pDag, (SRetrieveTableRsp **)&pJob->resData));
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
......
......@@ -469,31 +469,80 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
return TSDB_CODE_SUCCESS;
}
int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, void **pParam) {
if (!isHb) {
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->pTrans = pJob->conn.pTrans;
param->execIdx = pTask->execIdx;
*pParam = param;
return TSDB_CODE_SUCCESS;
}
if (TDMT_SCH_LINK_BROKEN == msgType) {
SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param->head.isHbParam = true;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
param->nodeEpId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(param->nodeEpId.ep.fqdn, pEp->fqdn);
param->nodeEpId.ep.port = pEp->port;
param->pTrans = trans->pTrans;
*pParam = param;
return TSDB_CODE_SUCCESS;
}
// hb msg
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
qError("calloc SSchTaskCallbackParam failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param->pTrans = trans->pTrans;
*pParam = param;
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
return TSDB_CODE_SUCCESS;
}
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->pTrans = pJob->pTrans;
param->execIdx = pTask->execIdx;
int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void* msg, uint32_t msgSize, int32_t msgType, SSchTrans *trans, bool isHb, SMsgSendInfo **pMsgSendInfo) {
int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
msgSendInfo->param = param;
msgSendInfo->fp = fp;
if (pJob) {
msgSendInfo->requestId = pJob->conn.requestId;
msgSendInfo->requestObjRefId = pJob->conn.requestObjRefId;
}
if (TDMT_SCH_LINK_BROKEN != msgType) {
msgSendInfo->msgInfo.pData = msg;
msgSendInfo->msgInfo.len = msgSize;
msgSendInfo->msgInfo.handle = trans->pHandle;
msgSendInfo->msgType = msgType;
}
*pMsgSendInfo = msgSendInfo;
......@@ -501,8 +550,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType,
_return:
taosMemoryFree(param);
taosMemoryFree(msgSendInfo);
schFreeSMsgSendInfo(msgSendInfo);
SCH_RET(code);
}
......@@ -551,7 +599,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
return TSDB_CODE_SUCCESS;
}
/*
int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
if (NULL == param) {
......@@ -573,6 +621,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
return TSDB_CODE_SUCCESS;
}
*/
int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
int32_t code = 0;
......@@ -650,7 +699,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));
param->nodeEpId = epId;
param->pTrans = pJob->pTrans;
param->pTrans = pJob->conn.pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp;
......@@ -675,100 +724,6 @@ _return:
SCH_RET(code);
}
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
SSchedulerHbReq req = {0};
int32_t code = 0;
SRpcCtx rpcCtx = {0};
SSchTrans trans = {0};
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT;
req.header.vgId = nodeEpId->nodeId;
req.sId = schMgmt.sId;
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
nodeEpId->ep.port);
return TSDB_CODE_SUCCESS;
}
SCH_LOCK(SCH_WRITE, &hb->lock);
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
memcpy(&trans, &hb->trans, sizeof(trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
SCH_ERR_RET(code);
int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
if (msgSize < 0) {
qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
qError("calloc hb req %d failed", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("calloc SMsgSendInfo failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
qError("calloc SSchTaskCallbackParam failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->pTrans = trans.pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = trans.pHandle;
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp;
int64_t transporterId = 0;
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
qDebug("start to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d", trans.pTrans, trans.pHandle,
nodeEpId->ep.fqdn, nodeEpId->ep.port);
code = asyncSendMsgToServerExt(trans.pTrans, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
if (code) {
qError("fail to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d, error:%x - %s", trans.pTrans,
trans.pHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
SCH_ERR_JRET(code);
}
qDebug("hb msg sent");
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(msg);
taosMemoryFreeClear(param);
taosMemoryFreeClear(pMsgSendInfo);
schFreeRpcCtx(&rpcCtx);
SCH_RET(code);
}
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
......@@ -799,45 +754,12 @@ _return:
SCH_RET(code);
}
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->pTrans = pJob->pTrans;
param->taskId = pTask->taskId;
*pParam = param;
return TSDB_CODE_SUCCESS;
}
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
int32_t code = 0;
int32_t code = 0;
int32_t msgType = TDMT_SCH_LINK_BROKEN;
SSchTrans trans = {.pTrans = pJob->conn.pTrans};
SMsgSendInfo *pMsgSendInfo = NULL;
pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (isHb) {
SCH_ERR_JRET(schMakeHbCallbackParam(pJob, pTask, &pMsgSendInfo->param));
} else {
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, &pMsgSendInfo->param));
}
int32_t msgType = TDMT_SCH_LINK_BROKEN;
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
pMsgSendInfo->fp = fp;
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, msgType, &trans, isHb, &pMsgSendInfo));
brokenVal->msgType = msgType;
brokenVal->val = pMsgSendInfo;
......@@ -863,7 +785,8 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_VND_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
int32_t msgType = TDMT_VND_EXPLAIN_RSP;
SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo};
......@@ -939,36 +862,58 @@ _return:
SCH_RET(code);
}
int32_t schUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, SQueryNodeAddr *addr, SSchTask *pTask) {
if (NULL == pTask || addr->nodeId < MNODE_HANDLE) {
return TSDB_CODE_SUCCESS;
}
if (addr->nodeId == MNODE_HANDLE) {
pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
} else {
pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
pMsgSendInfo->target.vgId = addr->nodeId;
pMsgSendInfo->target.dbFName = strdup(pTask->plan->dbFName);
}
return TSDB_CODE_SUCCESS;
}
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg,
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQueryNodeAddr *addr, int32_t msgType, void *msg,
uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
int32_t code = 0;
SSchTrans *trans = (SSchTrans *)transport;
SEpSet *epSet = &addr->epSet;
SMsgSendInfo *pMsgSendInfo = NULL;
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msgType, &pMsgSendInfo));
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = trans->pHandle;
pMsgSendInfo->msgType = msgType;
bool isHb = (TDMT_VND_QUERY_HEARTBEAT == msgType);
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "pTrans:%p, pHandle:%p", TMSG_INFO(msgType),
ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType),
addr->nodeId, epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port,
trans->pTrans, trans->pHandle);
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
if (code) {
SCH_ERR_JRET(code);
}
SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
if (pJob) {
SCH_TASK_DLOG("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
} else {
qDebug("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
}
return TSDB_CODE_SUCCESS;
_return:
if (pJob) {
SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
} else {
qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
}
if (pMsgSendInfo) {
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo);
......@@ -977,6 +922,68 @@ _return:
SCH_RET(code);
}
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
SSchedulerHbReq req = {0};
int32_t code = 0;
SRpcCtx rpcCtx = {0};
SSchTrans trans = {0};
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT;
req.header.vgId = nodeEpId->nodeId;
req.sId = schMgmt.sId;
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
nodeEpId->ep.port);
return TSDB_CODE_SUCCESS;
}
SCH_LOCK(SCH_WRITE, &hb->lock);
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
memcpy(&trans, &hb->trans, sizeof(trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
SCH_ERR_RET(code);
int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
if (msgSize < 0) {
qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
qError("calloc hb req %d failed", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int64_t transporterId = 0;
SQueryNodeAddr addr = {.nodeId = nodeEpId->nodeId};
addr.epSet.inUse = 0;
addr.epSet.numOfEps = 1;
memcpy(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
SCH_ERR_JRET(schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(msg);
schFreeRpcCtx(&rpcCtx);
SCH_RET(code);
}
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
......@@ -991,8 +998,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx);
}
SEpSet epSet = addr->epSet;
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_DROP_TABLE:
......@@ -1128,8 +1133,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
SSchTrans trans = {.pTrans = pJob->pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle,
(rpcCtx.args ? &rpcCtx : NULL)));
if (msgType == TDMT_VND_QUERY) {
......
......@@ -67,7 +67,7 @@ int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId,
int32_t code = 0;
SSchHbTrans hb = {0};
hb.trans.pTrans = pJob->pTrans;
hb.trans.pTrans = pJob->conn.pTrans;
hb.taskNum = 1;
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
......@@ -262,3 +262,12 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
}
}
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo) {
if (NULL == msgSendInfo) {
return;
}
taosMemoryFree(msgSendInfo->param);
taosMemoryFree(msgSendInfo);
}
......@@ -67,28 +67,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SQueryResult *pRes) {
if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes) {
if (NULL == pReq || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchResInfo resInfo = {.queryRes = pRes};
SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo));
SCH_RET(schExecJob(pReq, pJob, pRes));
}
int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, schedulerExecCallback fp, void* param) {
int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob) {
int32_t code = 0;
if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == fp) {
if (NULL == pReq || NULL == pJob) {
code = TSDB_CODE_QRY_INVALID_INPUT;
} else {
SSchResInfo resInfo = {.execFp = fp, .userParam = param};
code = schAsyncExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo);
code = schAsyncExecJob(pReq, pJob);
}
if (code != TSDB_CODE_SUCCESS) {
fp(NULL, param, code);
pReq->fp(NULL, pReq->cbParam, code);
}
return code;
......
......@@ -503,7 +503,19 @@ void* schtRunJobThread(void *aa) {
taosArrayPush(qnodeList, &qnodeAddr);
queryDone = 0;
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &queryJobRefId, "select * from tb", 0, schtQueryCb, &queryDone);
SRequestConnInfo conn = {.pTrans = mockPointer,
.requestId = 0,
.requestObjRefId = 0
};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = qnodeList,
.pDag = &dag,
.sql = "select * from tb",
.startTs = 0,
.fp = schtQueryCb,
.cbParam = &queryDone
};
code = schedulerAsyncExecJob(&req, &queryJobRefId);
assert(code == 0);
pJob = schAcquireJob(queryJobRefId);
......@@ -644,7 +656,19 @@ TEST(queryTest, normalCase) {
schtSetAsyncSendMsgToServer();
int32_t queryDone = 0;
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone);
SRequestConnInfo conn = {.pTrans = mockPointer,
.requestId = 0,
.requestObjRefId = 0
};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = qnodeList,
.pDag = &dag,
.sql = "select * from tb",
.startTs = 0,
.fp = schtQueryCb,
.cbParam = &queryDone
};
code = schedulerAsyncExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -737,7 +761,19 @@ TEST(queryTest, readyFirstCase) {
schtSetAsyncSendMsgToServer();
int32_t queryDone = 0;
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone);
SRequestConnInfo conn = {.pTrans = mockPointer,
.requestId = 0,
.requestObjRefId = 0
};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = qnodeList,
.pDag = &dag,
.sql = "select * from tb",
.startTs = 0,
.fp = schtQueryCb,
.cbParam = &queryDone
};
code = schedulerAsyncExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -835,7 +871,19 @@ TEST(queryTest, flowCtrlCase) {
schtSetAsyncSendMsgToServer();
int32_t queryDone = 0;
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone);
SRequestConnInfo conn = {.pTrans = mockPointer,
.requestId = 0,
.requestObjRefId = 0
};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = qnodeList,
.pDag = &dag,
.sql = "select * from tb",
.startTs = 0,
.fp = schtQueryCb,
.cbParam = &queryDone
};
code = schedulerAsyncExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -937,7 +985,20 @@ TEST(insertTest, normalCase) {
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SQueryResult res = {0};
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res);
SRequestConnInfo conn = {.pTrans = mockPointer,
.requestId = 0,
.requestObjRefId = 0
};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = qnodeList,
.pDag = &dag,
.sql = "insert into tb values(now,1)",
.startTs = 0,
.fp = NULL,
.cbParam = NULL
};
code = schedulerExecJob(&req, &insertJobRefId, &res);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册