提交 c8865f3a 编写于 作者: S Shengliang Guan

refactor: remove rpc client in executor and scanoperator

上级 557531da
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
#include "query.h" #include "query.h"
#include "tcommon.h" #include "tcommon.h"
#include "tmsgcb.h"
typedef void* qTaskInfo_t; typedef void* qTaskInfo_t;
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
...@@ -29,11 +30,12 @@ struct SRpcMsg; ...@@ -29,11 +30,12 @@ struct SRpcMsg;
struct SSubplan; struct SSubplan;
typedef struct SReadHandle { typedef struct SReadHandle {
void* reader; void* reader;
void* meta; void* meta;
void* config; void* config;
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb;
} SReadHandle; } SReadHandle;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 #define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
......
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
#include "tmsg.h" #include "tmsg.h"
#include "tmsgcb.h"
typedef enum { typedef enum {
JOB_TASK_STATUS_NULL = 0, JOB_TASK_STATUS_NULL = 0,
...@@ -149,7 +150,7 @@ int32_t cleanupTaskQueue(); ...@@ -149,7 +150,7 @@ int32_t cleanupTaskQueue();
*/ */
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx); int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx);
/** /**
* Asynchronously send message to server, after the response received, the callback will be incured. * Asynchronously send message to server, after the response received, the callback will be incured.
...@@ -160,7 +161,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra ...@@ -160,7 +161,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
* @param pInfo * @param pInfo
* @return * @return
*/ */
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); int32_t asyncSendMsgToServer(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp); int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp);
......
...@@ -621,7 +621,7 @@ static void *hbThreadFunc(void *param) { ...@@ -621,7 +621,7 @@ static void *hbThreadFunc(void *param) {
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
int64_t transporterId = 0; int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); asyncSendMsgToServer(NULL, pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq, false); tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr); hbClearReqInfo(pAppHbMgr);
......
...@@ -219,7 +219,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -219,7 +219,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
SMsgSendInfo* body = buildConnectMsg(pRequest, connType); SMsgSendInfo* body = buildConnectMsg(pRequest, connType);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
......
...@@ -589,7 +589,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -589,7 +589,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) { if (!async) {
tsem_wait(&pParam->rspSem); tsem_wait(&pParam->rspSem);
...@@ -666,7 +666,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -666,7 +666,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
// avoid double free if msg is sent // avoid double free if msg is sent
buf = NULL; buf = NULL;
...@@ -773,7 +773,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa ...@@ -773,7 +773,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
...@@ -1046,7 +1046,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1046,7 +1046,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
tscDebug("consumer %ld ask ep", tmq->consumerId); tscDebug("consumer %ld ask ep", tmq->consumerId);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) { if (!async) {
tsem_wait(&pParam->rspSem); tsem_wait(&pParam->rspSem);
...@@ -1198,7 +1198,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { ...@@ -1198,7 +1198,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
tmq->pollCnt++; tmq->pollCnt++;
} }
......
...@@ -201,6 +201,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { ...@@ -201,6 +201,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE_RSP, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE);
......
...@@ -262,7 +262,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { ...@@ -262,7 +262,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessMgmtMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "vmInt.h" #include "vmInt.h"
#include "qworker.h"
#include "sync.h" #include "sync.h"
#include "syncTools.h" #include "syncTools.h"
...@@ -50,6 +51,11 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -50,6 +51,11 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
case TDMT_DND_DROP_VNODE: case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg); code = vmProcessDropVnodeReq(pMgmt, pMsg);
break; break;
case TDMT_VND_FETCH_RSP:
// todo refactor
code = qWorkerProcessFetchRsp(NULL, NULL, &pMsg->rpcMsg);
pMsg->rpcMsg.pCont = NULL; // already freed in qworker
break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dError("msg:%p, not processed in vnode-mgmt queue", pMsg); dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
int32_t mndProcessQueryMsg(SNodeMsg *pReq) { int32_t mndProcessQueryMsg(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SReadHandle handle = {.mnd = pMnode}; SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb};
mTrace("msg:%p, in query queue is processing", pReq); mTrace("msg:%p, in query queue is processing", pReq);
switch (pReq->rpcMsg.msgType) { switch (pReq->rpcMsg.msgType) {
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndShow.h" #include "mndShow.h"
#include "systable.h" #include "systable.h"
#include "qworker.h"
#define SHOW_STEP_SIZE 100 #define SHOW_STEP_SIZE 100
...@@ -25,6 +26,7 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); ...@@ -25,6 +26,7 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
static bool mndCheckRetrieveFinished(SShowObj *pShow); static bool mndCheckRetrieveFinished(SShowObj *pShow);
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq); static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
static int32_t mndProcessRetrieveSysTableRsp(SNodeMsg *pRsp);
int32_t mndInitShow(SMnode *pMnode) { int32_t mndInitShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
...@@ -37,6 +39,7 @@ int32_t mndInitShow(SMnode *pMnode) { ...@@ -37,6 +39,7 @@ int32_t mndInitShow(SMnode *pMnode) {
} }
mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq); mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE_RSP, mndProcessRetrieveSysTableRsp);
return 0; return 0;
} }
...@@ -175,6 +178,13 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { ...@@ -175,6 +178,13 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove); taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
} }
static int32_t mndProcessRetrieveSysTableRsp(SNodeMsg *pRsp) {
mTrace("mnode-systable-retrieve-rsp is received");
qWorkerProcessFetchRsp(NULL, NULL, &pRsp->rpcMsg);
pRsp->rpcMsg.pCont = NULL; // already freed in qworker
return 0;
}
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
......
...@@ -51,7 +51,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } ...@@ -51,7 +51,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
qTrace("message in qnode query queue is processing"); qTrace("message in qnode query queue is processing");
SReadHandle handle = {0}; SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: { case TDMT_VND_QUERY: {
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "tlockfree.h" #include "tlockfree.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tmallocator.h" #include "tmallocator.h"
#include "tmsgcb.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tstream.h" #include "tstream.h"
#include "ttime.h" #include "ttime.h"
...@@ -120,7 +121,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); ...@@ -120,7 +121,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
// sma // sma
int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq); int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore); int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void tsdbUidStoreDestory(STbUidStore* pStore); void tsdbUidStoreDestory(STbUidStore* pStore);
......
...@@ -378,6 +378,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -378,6 +378,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pReadHandle,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
}; };
pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
...@@ -857,6 +858,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -857,6 +858,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SReadHandle handle = { SReadHandle handle = {
.reader = pExec->pExecReader[i], .reader = pExec->pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
}; };
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]); ASSERT(pExec->task[i]);
...@@ -897,6 +899,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { ...@@ -897,6 +899,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
SReadHandle handle = { SReadHandle handle = {
.reader = pStreamReader, .reader = pStreamReader,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
}; };
pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
......
...@@ -1698,7 +1698,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { ...@@ -1698,7 +1698,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
* @param pReq * @param pReq
* @return int32_t * @return int32_t
*/ */
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) { int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq, SMsgCb *pMsgCb) {
if (!pReq->rollup) { if (!pReq->rollup) {
tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid); tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1742,6 +1742,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) { ...@@ -1742,6 +1742,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) {
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pReadHandle,
.meta = pMeta, .meta = pMeta,
.pMsgCb = pMsgCb,
}; };
if (param->qmsg1) { if (param->qmsg1) {
......
...@@ -142,9 +142,9 @@ _err: ...@@ -142,9 +142,9 @@ _err:
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
#if 0 #if 0
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode}; SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
#endif #endif
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode}; SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
...@@ -306,7 +306,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -306,7 +306,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto _err; goto _err;
} }
tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req); tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req, &pVnode->msgCb);
tCoderClear(&coder); tCoderClear(&coder);
return 0; return 0;
......
...@@ -320,6 +320,7 @@ typedef struct SExchangeInfo { ...@@ -320,6 +320,7 @@ typedef struct SExchangeInfo {
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
SMsgCb* pMsgCb;
SSDataBlock* pResult; SSDataBlock* pResult;
bool seqLoadData; // sequential load data or not, false by default bool seqLoadData; // sequential load data or not, false by default
int32_t current; int32_t current;
...@@ -389,10 +390,7 @@ typedef struct SStreamBlockScanInfo { ...@@ -389,10 +390,7 @@ typedef struct SStreamBlockScanInfo {
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
union { SReadHandle readHandle;
void* pTransporter;
SReadHandle readHandle;
};
SRetrieveMetaTableRsp* pRsp; SRetrieveMetaTableRsp* pRsp;
SRetrieveTableReq req; SRetrieveTableReq req;
...@@ -659,7 +657,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -659,7 +657,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
char* pData, int16_t bytes, bool masterscan, uint64_t groupId, char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(SMsgCb *pMsgCb, const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo, SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo,
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo); SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo);
......
...@@ -2810,7 +2810,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ...@@ -2810,7 +2810,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pMsgSendInfo->fp = loadRemoteDataCallback; pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0; int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); int32_t code = asyncSendMsgToServer(pExchangeInfo->pMsgCb, pExchangeInfo->pTransporter, &pSource->addr.epSet,
&transporterId, pMsgSendInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3256,7 +3257,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { ...@@ -3256,7 +3257,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createExchangeOperatorInfo(SMsgCb* pMsgCb, const SNodeList* pSources, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -3299,29 +3301,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock ...@@ -3299,29 +3301,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL); destroyExchangeOperatorInfo, NULL, NULL, NULL);
pInfo->pMsgCb = pMsgCb;
#if 1
{ // todo refactor
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "EX";
rpcInit.numOfThreads = 1;
rpcInit.cfp = qProcessFetchRsp;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char*)"root";
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = (char*)"dcc5bed04851fec854c035b2e40263b6";
pInfo->pTransporter = rpcOpen(&rpcInit);
if (pInfo->pTransporter == NULL) {
return NULL; // todo
}
}
#endif
return pOperator; return pOperator;
...@@ -4774,7 +4754,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4774,7 +4754,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); return createExchangeOperatorInfo(pHandle->pMsgCb, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
......
...@@ -1002,7 +1002,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1002,7 +1002,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pMsgSendInfo->fp = loadSysTableCallback; pMsgSendInfo->fp = loadSysTableCallback;
int64_t transporterId = 0; int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); int32_t code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb, NULL, &pInfo->epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pInfo->ready); tsem_wait(&pInfo->ready);
if (pTaskInfo->code) { if (pTaskInfo->code) {
...@@ -1126,29 +1126,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe ...@@ -1126,29 +1126,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
} else { } else {
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pInfo->epSet = epset; pInfo->epSet = epset;
pInfo->readHandle = *(SReadHandle*)readHandle;
#if 1
{ // todo refactor
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "DB-META";
rpcInit.numOfThreads = 1;
rpcInit.cfp = qProcessFetchRsp;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char*)"root";
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = (char*)"dcc5bed04851fec854c035b2e40263b6";
pInfo->pTransporter = rpcOpen(&rpcInit);
if (pInfo->pTransporter == NULL) {
return NULL; // todo
}
}
#endif
} }
pOperator->name = "SysTableScanOperator"; pOperator->name = "SysTableScanOperator";
......
...@@ -136,7 +136,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) ...@@ -136,7 +136,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return 0; return 0;
} }
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) { int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len); char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) { if (NULL == pMsg) {
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
...@@ -159,12 +159,20 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra ...@@ -159,12 +159,20 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
assert(pInfo->fp != NULL); assert(pInfo->fp != NULL);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); if (pMsgCb != NULL) {
// todo in multi-process mode
ASSERT(pTransporterId == NULL || *pTransporterId == 0);
ASSERT(rpcCtx == NULL);
tmsgSendReq(pMsgCb, epSet, &rpcMsg);
} else {
ASSERT(pTransporter != NULL);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { int32_t asyncSendMsgToServer(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); return asyncSendMsgToServerExt(pMsgCb, pTransporter, epSet, pTransporterId, pInfo, false, NULL);
} }
char *jobTaskStatusStr(int32_t status) { char *jobTaskStatusStr(int32_t status) {
......
...@@ -40,8 +40,9 @@ enum { ...@@ -40,8 +40,9 @@ enum {
}; };
typedef struct SSchTrans { typedef struct SSchTrans {
void *transInst; void *transInst;
void *transHandle; void *transHandle;
SMsgCb *pMsgCb;
} SSchTrans; } SSchTrans;
typedef struct SSchHbTrans { typedef struct SSchHbTrans {
......
...@@ -1850,7 +1850,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet ...@@ -1850,7 +1850,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet
trans->transInst, trans->transHandle); trans->transInst, trans->transHandle);
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); code = asyncSendMsgToServerExt(trans->pMsgCb, trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
if (code) { if (code) {
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
...@@ -1940,7 +1940,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { ...@@ -1940,7 +1940,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
nodeEpId->ep.fqdn, nodeEpId->ep.port); nodeEpId->ep.fqdn, nodeEpId->ep.port);
code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); code = asyncSendMsgToServerExt(trans.pMsgCb, trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
if (code) { if (code) {
qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst, qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册