提交 00704f9d 编写于 作者: S Shengliang Guan

refactor: remove rpc client in executor and scanoperator

上级 5f33d88c
......@@ -150,7 +150,8 @@ int32_t cleanupTaskQueue();
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
bool persistHandle, void* ctx);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
......@@ -161,7 +162,7 @@ int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSe
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp);
......
......@@ -621,7 +621,7 @@ static void *hbThreadFunc(void *param) {
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(NULL, pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
......
......@@ -218,7 +218,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS;
......@@ -507,7 +507,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
SMsgSendInfo* body = buildConnectMsg(pRequest, connType);
int64_t transporterId = 0;
asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem);
if (pRequest->code != TSDB_CODE_SUCCESS) {
......
......@@ -589,7 +589,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
......@@ -666,7 +666,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
// avoid double free if msg is sent
buf = NULL;
......@@ -773,7 +773,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
......@@ -1046,7 +1046,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
tscDebug("consumer %ld ask ep", tmq->consumerId);
int64_t transporterId = 0;
asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
......@@ -1198,7 +1198,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
tmq->pollCnt++;
}
......
......@@ -2813,8 +2813,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pMsgCb, pExchangeInfo->pTransporter, &pSource->addr.epSet,
&transporterId, pMsgSendInfo);
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
return TSDB_CODE_SUCCESS;
}
......
......@@ -136,7 +136,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return 0;
}
int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) {
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
bool persistHandle, void* rpcCtx) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) {
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
......@@ -159,20 +160,12 @@ int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSe
assert(pInfo->fp != NULL);
if (pMsgCb != NULL) {
// todo in multi-process mode
ASSERT(pTransporterId == NULL || *pTransporterId == 0);
ASSERT(rpcCtx == NULL);
tmsgSendReq(pMsgCb, epSet, &rpcMsg);
} else {
ASSERT(pTransporter != NULL);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
}
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
return TSDB_CODE_SUCCESS;
}
int32_t asyncSendMsgToServer(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
return asyncSendMsgToServerExt(pMsgCb, pTransporter, epSet, pTransporterId, pInfo, false, NULL);
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
}
char *jobTaskStatusStr(int32_t status) {
......
......@@ -40,9 +40,8 @@ enum {
};
typedef struct SSchTrans {
void *transInst;
void *transHandle;
SMsgCb *pMsgCb;
void *transInst;
void *transHandle;
} SSchTrans;
typedef struct SSchHbTrans {
......
......@@ -1850,7 +1850,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet
trans->transInst, trans->transHandle);
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->pMsgCb, trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
if (code) {
SCH_ERR_JRET(code);
}
......@@ -1940,7 +1940,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
nodeEpId->ep.fqdn, nodeEpId->ep.port);
code = asyncSendMsgToServerExt(trans.pMsgCb, trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
if (code) {
qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册