提交 fefd2506 编写于 作者: dengyihao's avatar dengyihao

fix rpc bug

上级 124b8b66
...@@ -275,7 +275,7 @@ static void dmCloseNodes(SDnode *pDnode) { ...@@ -275,7 +275,7 @@ static void dmCloseNodes(SDnode *pDnode) {
static void dmProcessProcHandle(void *handle) { static void dmProcessProcHandle(void *handle) {
dWarn("handle:%p, the child process dies and send an offline rsp", handle); dWarn("handle:%p, the child process dies and send an offline rsp", handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE, .refId = -1}; SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
......
...@@ -38,6 +38,7 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { ...@@ -38,6 +38,7 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
static inline void bmSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void bmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle, .ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code, .code = code,
.pCont = pMsg->pRsp, .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen}; .contLen = pMsg->rspLen};
...@@ -101,7 +102,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -101,7 +102,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt; SBnodeMgmt * pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = &pMgmt->writeWorker; SMultiWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -110,7 +111,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -110,7 +111,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt; SBnodeMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle, .ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code, .code = code,
.pCont = pMsg->pRsp, .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen}; .contLen = pMsg->rspLen};
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle, .ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code, .code = code,
.pCont = pMsg->pRsp, .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen}; .contLen = pMsg->rspLen};
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle, .ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code, .code = code,
.pCont = pMsg->pRsp, .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen}; .contLen = pMsg->rspLen};
...@@ -149,7 +150,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { ...@@ -149,7 +150,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
} }
int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSnodeMgmt * pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) { if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -162,7 +163,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -162,7 +163,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSnodeMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -171,7 +172,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -171,7 +172,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSnodeMgmt * pMgmt = pWrapper->pMgmt;
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) { if (pWorker == NULL) {
...@@ -185,7 +186,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -185,7 +186,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSnodeMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->sharedWorker; SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle, .ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code, .code = code,
.pCont = pMsg->pRsp, .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen}; .contLen = pMsg->rspLen};
...@@ -112,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -112,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
numOfMsgs = taosArrayGetSize(pArray); numOfMsgs = taosArrayGetSize(pArray);
for (int32_t i = 0; i < numOfMsgs; i++) { for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg * pRpc = &pMsg->rpcMsg;
SRpcMsg rsp; SRpcMsg rsp;
rsp.pCont = NULL; rsp.pCont = NULL;
...@@ -120,6 +121,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -120,6 +121,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp.code = 0; rsp.code = 0;
rsp.handle = pRpc->handle; rsp.handle = pRpc->handle;
rsp.ahandle = pRpc->ahandle; rsp.ahandle = pRpc->ahandle;
rsp.refId = pRpc->refId;
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp); int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
...@@ -147,7 +149,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -147,7 +149,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL; SNodeMsg * pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
...@@ -160,7 +162,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -160,7 +162,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL; SNodeMsg * pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
...@@ -173,7 +175,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -173,7 +175,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL; SNodeMsg * pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
...@@ -190,7 +192,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -190,7 +192,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg * pRpc = &pMsg->rpcMsg;
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
pHead->contLen = ntohl(pHead->contLen); pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId); pHead->vgId = ntohl(pHead->vgId);
...@@ -259,7 +261,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -259,7 +261,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->mgmtWorker; SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
...@@ -267,7 +269,7 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -267,7 +269,7 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -277,7 +279,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -277,7 +279,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHead *pHead = pRpc->pCont; SMsgHead * pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
......
...@@ -135,6 +135,7 @@ typedef struct { ...@@ -135,6 +135,7 @@ typedef struct {
int32_t failedTimes; int32_t failedTimes;
void* rpcHandle; void* rpcHandle;
void* rpcAHandle; void* rpcAHandle;
int64_t rpcRefId;
void* rpcRsp; void* rpcRsp;
int32_t rpcRspLen; int32_t rpcRspLen;
SArray* redoLogs; SArray* redoLogs;
......
...@@ -193,9 +193,9 @@ TRANS_ENCODE_OVER: ...@@ -193,9 +193,9 @@ TRANS_ENCODE_OVER:
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL; SSdbRow * pRow = NULL;
STrans *pTrans = NULL; STrans * pTrans = NULL;
char *pData = NULL; char * pData = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
int8_t sver = 0; int8_t sver = 0;
int32_t redoLogNum = 0; int32_t redoLogNum = 0;
...@@ -456,7 +456,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { ...@@ -456,7 +456,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
} }
static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId); STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
if (pTrans == NULL) { if (pTrans == NULL) {
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
...@@ -484,6 +484,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S ...@@ -484,6 +484,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans->createdTime = taosGetTimestampMs(); pTrans->createdTime = taosGetTimestampMs();
pTrans->rpcHandle = pReq->handle; pTrans->rpcHandle = pReq->handle;
pTrans->rpcAHandle = pReq->ahandle; pTrans->rpcAHandle = pReq->ahandle;
pTrans->rpcRefId = pReq->refId;
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
...@@ -625,7 +626,7 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT ...@@ -625,7 +626,7 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT
if (mndIsBasicTrans(pNewTrans)) return 0; if (mndIsBasicTrans(pNewTrans)) return 0;
STrans *pTrans = NULL; STrans *pTrans = NULL;
void *pIter = NULL; void * pIter = NULL;
int32_t code = 0; int32_t code = 0;
while (1) { while (1) {
...@@ -703,6 +704,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -703,6 +704,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pNew->rpcHandle = pTrans->rpcHandle; pNew->rpcHandle = pTrans->rpcHandle;
pNew->rpcAHandle = pTrans->rpcAHandle; pNew->rpcAHandle = pTrans->rpcAHandle;
pNew->rpcRefId = pTrans->rpcRefId;
pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen; pNew->rpcRspLen = pTrans->rpcRspLen;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
...@@ -767,6 +769,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -767,6 +769,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
.code = pTrans->code, .code = pTrans->code,
.ahandle = pTrans->rpcAHandle, .ahandle = pTrans->rpcAHandle,
.refId = pTrans->rpcRefId,
.pCont = rpcCont, .pCont = rpcCont,
.contLen = pTrans->rpcRspLen}; .contLen = pTrans->rpcRspLen};
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
...@@ -827,7 +830,7 @@ HANDLE_ACTION_RSP_OVER: ...@@ -827,7 +830,7 @@ HANDLE_ACTION_RSP_OVER:
} }
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
int32_t arraySize = taosArrayGetSize(pArray); int32_t arraySize = taosArrayGetSize(pArray);
if (arraySize == 0) return 0; if (arraySize == 0) return 0;
...@@ -1202,11 +1205,11 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { ...@@ -1202,11 +1205,11 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
} }
static int32_t mndProcessKillTransReq(SNodeMsg *pReq) { static int32_t mndProcessKillTransReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode * pMnode = pReq->pNode;
SKillTransReq killReq = {0}; SKillTransReq killReq = {0};
int32_t code = -1; int32_t code = -1;
SUserObj *pUser = NULL; SUserObj * pUser = NULL;
STrans *pTrans = NULL; STrans * pTrans = NULL;
if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) { if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -1246,7 +1249,7 @@ KILL_OVER: ...@@ -1246,7 +1249,7 @@ KILL_OVER:
void mndTransPullup(SMnode *pMnode) { void mndTransPullup(SMnode *pMnode) {
STrans *pTrans = NULL; STrans *pTrans = NULL;
void *pIter = NULL; void * pIter = NULL;
while (1) { while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
...@@ -1261,11 +1264,11 @@ void mndTransPullup(SMnode *pMnode) { ...@@ -1261,11 +1264,11 @@ void mndTransPullup(SMnode *pMnode) {
static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
STrans *pTrans = NULL; STrans *pTrans = NULL;
int32_t cols = 0; int32_t cols = 0;
char *pWrite; char * pWrite;
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans); pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans);
......
此差异已折叠。
...@@ -120,8 +120,8 @@ typedef struct SQWTaskCtx { ...@@ -120,8 +120,8 @@ typedef struct SQWTaskCtx {
int8_t events[QW_EVENT_MAX]; int8_t events[QW_EVENT_MAX];
qTaskInfo_t taskHandle; void *taskHandle;
DataSinkHandle sinkHandle; void *sinkHandle;
} SQWTaskCtx; } SQWTaskCtx;
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
......
#include "qworker.h" #include "qworkerMsg.h"
#include "tcommon.h" #include "dataSinkMgt.h"
#include "executor.h" #include "executor.h"
#include "planner.h" #include "planner.h"
#include "query.h" #include "query.h"
#include "qworker.h"
#include "qworkerInt.h" #include "qworkerInt.h"
#include "qworkerMsg.h" #include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "dataSinkMgt.h"
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length; int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
...@@ -26,8 +25,6 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { ...@@ -26,8 +25,6 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) { void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
...@@ -39,7 +36,6 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) ...@@ -39,7 +36,6 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp->numOfRows = htonl(input->numOfRows); rsp->numOfRows = htonl(input->numOfRows);
} }
void qwFreeFetchRsp(void *msg) { void qwFreeFetchRsp(void *msg) {
if (msg) { if (msg) {
rpcFreeCont(msg); rpcFreeCont(msg);
...@@ -50,13 +46,14 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -50,13 +46,14 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SQueryTableRsp rsp = {.code = code}; SQueryTableRsp rsp = {.code = code};
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
void *msg = rpcMallocCont(contLen); void * msg = rpcMallocCont(contLen);
tSerializeSQueryTableRsp(msg, contLen, &rsp); tSerializeSQueryTableRsp(msg, contLen, &rsp);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP, .msgType = TDMT_VND_QUERY_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = msg, .pCont = msg,
.contLen = contLen, .contLen = contLen,
.code = code, .code = code,
...@@ -74,6 +71,7 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -74,6 +71,7 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP, .msgType = TDMT_VND_RES_READY_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.refId = pConn->refId,
.ahandle = NULL, .ahandle = NULL,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
...@@ -89,13 +87,14 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, ...@@ -89,13 +87,14 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen); void * pRsp = rpcMallocCont(contLen);
tSerializeSExplainRsp(pRsp, contLen, &rsp); tSerializeSExplainRsp(pRsp, contLen, &rsp);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_EXPLAIN_RSP, .msgType = TDMT_VND_EXPLAIN_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = contLen, .contLen = contLen,
.code = 0, .code = 0,
...@@ -108,13 +107,14 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, ...@@ -108,13 +107,14 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen); void * pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = contLen, .contLen = contLen,
.code = code, .code = code,
...@@ -136,6 +136,7 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 ...@@ -136,6 +136,7 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
.msgType = TDMT_VND_FETCH_RSP, .msgType = TDMT_VND_FETCH_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength, .contLen = sizeof(*pRsp) + dataLength,
.code = code, .code = code,
...@@ -154,6 +155,7 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -154,6 +155,7 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
.msgType = TDMT_VND_CANCEL_TASK_RSP, .msgType = TDMT_VND_CANCEL_TASK_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = code, .code = code,
...@@ -171,6 +173,7 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -171,6 +173,7 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
.msgType = TDMT_VND_DROP_TASK_RSP, .msgType = TDMT_VND_DROP_TASK_RSP,
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = code, .code = code,
...@@ -220,12 +223,13 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -220,12 +223,13 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
showRsp.tableMeta.numOfColumns = cols; showRsp.tableMeta.numOfColumns = cols;
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp); int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
void *pBuf = rpcMallocCont(bufLen); void * pBuf = rpcMallocCont(bufLen);
tSerializeSShowRsp(pBuf, bufLen, &showRsp); tSerializeSShowRsp(pBuf, bufLen, &showRsp);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.refId = pMsg->refId,
.pCont = pBuf, .pCont = pBuf,
.contLen = bufLen, .contLen = bufLen,
.code = code, .code = code,
...@@ -235,7 +239,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -235,7 +239,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchReq) {
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
int32_t handle = htonl(pFetchReq->id); int32_t handle = htonl(pFetchReq->id);
...@@ -243,6 +247,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe ...@@ -243,6 +247,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.refId = pMsg->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = 0, .code = 0,
...@@ -253,7 +258,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe ...@@ -253,7 +258,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
} }
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) { if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -268,6 +273,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -268,6 +273,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE, .msgType = TDMT_VND_QUERY_CONTINUE,
.refId = pConn->refId,
.pCont = req, .pCont = req,
.contLen = sizeof(SQueryContinueReq), .contLen = sizeof(SQueryContinueReq),
.code = 0, .code = 0,
...@@ -285,9 +291,8 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -285,9 +291,8 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) { if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -302,6 +307,7 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -302,6 +307,7 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
SRpcMsg pMsg = { SRpcMsg pMsg = {
.handle = pConn->handle, .handle = pConn->handle,
.ahandle = pConn->ahandle, .ahandle = pConn->ahandle,
.refId = pConn->refId,
.msgType = TDMT_VND_DROP_TASK, .msgType = TDMT_VND_DROP_TASK,
.pCont = req, .pCont = req,
.contLen = sizeof(STaskDropReq), .contLen = sizeof(STaskDropReq),
...@@ -313,7 +319,6 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -313,7 +319,6 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...@@ -343,8 +348,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -343,8 +348,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen}; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
char* sql = strndup(msg->msg, msg->sqlLen); char *sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql); QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
taosMemoryFreeClear(sql); taosMemoryFreeClear(sql);
...@@ -361,8 +367,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -361,8 +367,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
bool queryDone = false; bool queryDone = false;
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false; bool needStop = false;
SQWTaskCtx *handles = NULL; SQWTaskCtx * handles = NULL;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
...@@ -377,6 +383,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -377,6 +383,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle); QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle);
...@@ -387,7 +394,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -387,7 +394,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -411,6 +418,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ ...@@ -411,6 +418,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle); QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle);
...@@ -439,11 +447,11 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -439,11 +447,11 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SSchedulerStatusRsp *sStatus = NULL; SSchedulerStatusRsp *sStatus = NULL;
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); // QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return: _return:
//QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); // QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -473,6 +481,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -473,6 +481,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle); QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle);
...@@ -493,7 +502,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -493,7 +502,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt;
int32_t code = 0; int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont; STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
...@@ -514,8 +523,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -514,8 +523,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return: _return:
...@@ -552,6 +562,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -552,6 +562,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code)); QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
...@@ -573,7 +584,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -573,7 +584,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbReq req = {0}; SSchedulerHbReq req = {0};
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt * mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == pMsg->pCont) { if (NULL == pMsg->pCont) {
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
...@@ -590,6 +601,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -590,6 +601,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle); QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
......
...@@ -327,6 +327,10 @@ void transQueueClear(STransQueue* queue); ...@@ -327,6 +327,10 @@ void transQueueClear(STransQueue* queue);
*/ */
void transQueueDestroy(STransQueue* queue); void transQueueDestroy(STransQueue* queue);
/*
* init global func
*/
void transThreadOnce();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include "transComm.h" #include "transComm.h"
// static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context; T_MD5_CTX context;
int ret = -1; int ret = -1;
...@@ -361,5 +363,10 @@ void transQueueDestroy(STransQueue* queue) { ...@@ -361,5 +363,10 @@ void transQueueDestroy(STransQueue* queue) {
transQueueClear(queue); transQueueClear(queue);
taosArrayDestroy(queue->q); taosArrayDestroy(queue->q);
} }
// int32_t transGetExHandle() {
// static
//}
// void transThreadOnce() {
// taosThreadOnce(&transModuleInit, );
//}
#endif #endif
...@@ -17,6 +17,10 @@ ...@@ -17,6 +17,10 @@
#include "transComm.h" #include "transComm.h"
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static char* notify = "a";
typedef struct { typedef struct {
int notifyCount; // int notifyCount; //
int init; // init or not int init; // init or not
...@@ -97,8 +101,6 @@ typedef struct SExHandle { ...@@ -97,8 +101,6 @@ typedef struct SExHandle {
SWorkThrdObj* pThrd; SWorkThrdObj* pThrd;
} SExHandle; } SExHandle;
static const char* notify = "a";
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
...@@ -138,9 +140,11 @@ static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); ...@@ -138,9 +140,11 @@ static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister}; uvHandleRegister};
static int exHandlesMgt; static int32_t exHandlesMgt;
void uvInitExHandleMgt();
void uvOpenExHandleMgt(int size); void uvOpenExHandleMgt(int size);
void uvCloseExHandleMgt();
int64_t uvAddExHandle(void* p); int64_t uvAddExHandle(void* p);
int32_t uvRemoveExHandle(int64_t refId); int32_t uvRemoveExHandle(int64_t refId);
int32_t uvReleaseExHandle(int64_t refId); int32_t uvReleaseExHandle(int64_t refId);
...@@ -189,18 +193,35 @@ static bool addHandleToAcceptloop(void* arg); ...@@ -189,18 +193,35 @@ static bool addHandleToAcceptloop(void* arg);
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
tTrace("worker thread already quit, ignore msg"); \ tTrace("worker thread already quit, ignore msg"); \
goto _return; \ goto _return1; \
} \ } \
} while (0) } while (0)
#define ASYNC_CHECK_HANDLE(exh1, refId) \ #define ASYNC_CHECK_HANDLE(exh1, refId) \
do { \ do { \
if (refId != -1) { \ if (refId > 0) { \
tTrace("server handle step1"); \
SExHandle* exh2 = uvAcquireExHandle(refId); \ SExHandle* exh2 = uvAcquireExHandle(refId); \
if (exh2 == NULL || exh1 != exh2) { \ if (exh2 == NULL || exh1 != exh2) { \
tTrace("server conn %p except, may already freed, ignore msg", exh2 ? exh2->handle : NULL); \ tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
goto _return; \ exh1->refId, refId); \
tTrace("server handle step2"); \
goto _return1; \
} \
} else if (refId == 0) { \
tTrace("server handle step3"); \
SExHandle* exh2 = uvAcquireExHandle(refId); \
tTrace("server handle %p except, may already freed, ignore msg, ", exh1); \
if (exh2 != NULL && exh1 != exh2) { \
tTrace("server handle step4"); \
tTrace("server handle %p except, may already freed, ignore msg, ", exh1); \
goto _return1; \
} else { \
refId = exh1->refId; \
} \ } \
} else if (refId == -1) { \
tTrace("server handle step5"); \
goto _return2; \
} \ } \
} while (0) } while (0)
...@@ -269,9 +290,13 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -269,9 +290,13 @@ static void uvHandleReq(SSrvConn* pConn) {
// 1. server application should not send resp on handle // 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist // 3. not mixed with persist
transMsg.handle = (void*)uvAcquireExHandle(pConn->refId); transMsg.handle = (void*)uvAcquireExHandle(pConn->refId);
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.handle, pConn, pConn->refId);
transMsg.refId = pConn->refId;
assert(transMsg.handle != NULL);
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
// transMsg.refId = -1; transMsg.refId = -1;
} }
uvReleaseExHandle(pConn->refId); uvReleaseExHandle(pConn->refId);
...@@ -444,6 +469,7 @@ static void destroySmsg(SSrvMsg* smsg) { ...@@ -444,6 +469,7 @@ static void destroySmsg(SSrvMsg* smsg) {
taosMemoryFree(smsg); taosMemoryFree(smsg);
} }
static void destroyAllConn(SWorkThrdObj* pThrd) { static void destroyAllConn(SWorkThrdObj* pThrd) {
tTrace("thread %p destroy all conn ", pThrd);
while (!QUEUE_IS_EMPTY(&pThrd->conn)) { while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* h = QUEUE_HEAD(&pThrd->conn); queue* h = QUEUE_HEAD(&pThrd->conn);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
...@@ -477,19 +503,25 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -477,19 +503,25 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
continue; continue;
} }
// release handle to rpc init // release handle to rpc init
if (msg->type == Quit) {
(*transAsyncHandle[msg->type])(msg, pThrd);
continue;
} else {
STransMsg transMsg = msg->msg; STransMsg transMsg = msg->msg;
SExHandle* exh1 = transMsg.handle; SExHandle* exh1 = transMsg.handle;
int64_t refId = transMsg.refId; int64_t refId = transMsg.refId;
SExHandle* exh2 = uvAcquireExHandle(refId); SExHandle* exh2 = uvAcquireExHandle(refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("server handle %p except msg, ignore it", exh1);
uvReleaseExHandle(refId); uvReleaseExHandle(refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
} }
msg->pConn = exh1->handle; msg->pConn = exh1->handle;
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} }
}
} }
static void uvWalkCb(uv_handle_t* handle, void* arg) { static void uvWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
...@@ -718,7 +750,7 @@ static SSrvConn* createConn(void* hThrd) { ...@@ -718,7 +750,7 @@ static SSrvConn* createConn(void* hThrd) {
pConn->refId = exh->refId; pConn->refId = exh->refId;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tTrace("server conn %p created", pConn); tTrace("server handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId);
return pConn; return pConn;
} }
...@@ -767,7 +799,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -767,7 +799,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->port = port; srv->port = port;
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
uvOpenExHandleMgt(10000); taosThreadOnce(&transModuleInit, uvInitExHandleMgt);
// uvOpenExHandleMgt(10000);
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj)); SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
...@@ -813,10 +846,19 @@ End: ...@@ -813,10 +846,19 @@ End:
transCloseServer(srv); transCloseServer(srv);
return NULL; return NULL;
} }
void uvInitExHandleMgt() {
// init exhandle mgt
uvOpenExHandleMgt(10000);
}
void uvOpenExHandleMgt(int size) { void uvOpenExHandleMgt(int size) {
// added into once later // added into once later
exHandlesMgt = taosOpenRef(size, uvDestoryExHandle); exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
} }
void uvCloseExHandleMgt() {
// close ref
taosCloseRef(exHandlesMgt);
}
int64_t uvAddExHandle(void* p) { int64_t uvAddExHandle(void* p) {
// acquire extern handle // acquire extern handle
return taosAddRef(exHandlesMgt, p); return taosAddRef(exHandlesMgt, p);
...@@ -932,6 +974,8 @@ void transCloseServer(void* arg) { ...@@ -932,6 +974,8 @@ void transCloseServer(void* arg) {
taosMemoryFree(srv->pipe); taosMemoryFree(srv->pipe);
taosMemoryFree(srv); taosMemoryFree(srv);
// uvCloseExHandleMgt();
} }
void transRefSrvHandle(void* handle) { void transRefSrvHandle(void* handle) {
...@@ -955,14 +999,14 @@ void transUnrefSrvHandle(void* handle) { ...@@ -955,14 +999,14 @@ void transUnrefSrvHandle(void* handle) {
void transReleaseSrvHandle(void* handle) { void transReleaseSrvHandle(void* handle) {
SExHandle* exh = handle; SExHandle* exh = handle;
// TODO(yihaoDeng): not safy here,
int64_t refId = exh->refId; int64_t refId = exh->refId;
ASYNC_CHECK_HANDLE(exh, refId); ASYNC_CHECK_HANDLE(exh, refId);
SWorkThrdObj* pThrd = exh->pThrd; SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = exh->refId}; STransMsg tmsg = {.code = 0, .handle = exh, .ahandle = NULL, .refId = refId};
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg; srvMsg->msg = tmsg;
...@@ -972,50 +1016,73 @@ void transReleaseSrvHandle(void* handle) { ...@@ -972,50 +1016,73 @@ void transReleaseSrvHandle(void* handle) {
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId); uvReleaseExHandle(refId);
return; return;
_return: _return1:
uvReleaseExHandle(refId); uvReleaseExHandle(refId);
return;
_return2:
return;
} }
void transSendResponse(const STransMsg* msg) { void transSendResponse(const STransMsg* msg) {
SExHandle* exh = msg->handle; SExHandle* exh = msg->handle;
int64_t refId = msg->refId; int64_t refId = msg->refId;
ASYNC_CHECK_HANDLE(exh, refId); ASYNC_CHECK_HANDLE(exh, refId);
STransMsg tmsg = *msg;
tmsg.refId = refId;
SWorkThrdObj* pThrd = exh->pThrd; SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = *msg; srvMsg->msg = tmsg;
srvMsg->type = Normal; srvMsg->type = Normal;
tTrace("server conn %p start to send resp (1/2)", exh->handle); tTrace("server conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId); uvReleaseExHandle(refId);
return; return;
_return: _return1:
rpcFreeCont(msg->pCont);
uvReleaseExHandle(refId); uvReleaseExHandle(refId);
return;
_return2:
rpcFreeCont(msg->pCont);
return;
} }
void transRegisterMsg(const STransMsg* msg) { void transRegisterMsg(const STransMsg* msg) {
SExHandle* exh = NULL; SExHandle* exh = msg->handle;
int64_t refId = msg->refId; int64_t refId = msg->refId;
ASYNC_CHECK_HANDLE(exh, refId); ASYNC_CHECK_HANDLE(exh, refId);
STransMsg tmsg = *msg;
tmsg.refId = refId;
SWorkThrdObj* pThrd = exh->pThrd; SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = *msg; srvMsg->msg = tmsg;
srvMsg->type = Register; srvMsg->type = Register;
tTrace("server conn %p start to register brokenlink callback", exh->handle); tTrace("server conn %p start to register brokenlink callback", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId); uvReleaseExHandle(refId);
return; return;
_return: _return1:
rpcFreeCont(msg->pCont);
uvReleaseExHandle(refId); uvReleaseExHandle(refId);
return;
_return2:
rpcFreeCont(msg->pCont);
} }
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
if (thandle == NULL) {
tTrace("invalid handle %p, failed to Get Conn info", thandle);
return -1;
}
SExHandle* ex = thandle; SExHandle* ex = thandle;
SSrvConn* pConn = ex->handle; SSrvConn* pConn = ex->handle;
struct sockaddr_in addr = pConn->addr;
struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr); pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
pInfo->clientPort = ntohs(addr.sin_port); pInfo->clientPort = ntohs(addr.sin_port);
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册