220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 1) /* 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 2) * Copyright (c) 2019 TAOS Data, Inc. 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 3) * 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 4) * This program is free software: you can use, redistribute, and/or modify 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 5) * it under the terms of the GNU Affero General Public License, version 3 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 6) * or later ("AGPL"), as published by the Free Software Foundation. 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 7) * 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 8) * This program is distributed in the hope that it will be useful, but WITHOUT 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 9) * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 10) * FITNESS FOR A PARTICULAR PURPOSE. 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 11) * 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 12) * You should have received a copy of the GNU Affero General Public License 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 13) * along with this program. If not, see . 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 14) */ 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 15) 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 16) #define _DEFAULT_SOURCE 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 17) #include "tmsgcb.h" 2f42c2e7933 source/common/src/tmsgcb.c (Shengliang Guan 2022-05-04 22:00:04 +0800 18) #include "taoserror.h" 363cbc8985d source/libs/transport/src/tmsgcb.c (Benguang Zhao 2022-11-18 09:37:58 +0800 19) #include "transLog.h" 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 20) #include "trpc.h" 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 21) 0164158256d source/common/src/tmsgcb.c (Shengliang Guan 2022-05-18 18:37:39 +0800 22) static SMsgCb defaultMsgCb; ac6b121348d source/common/src/tmsgcb.c (Shengliang Guan 2022-03-29 13:39:55 +0800 23) 0164158256d source/common/src/tmsgcb.c (Shengliang Guan 2022-05-18 18:37:39 +0800 24) void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; } ac6b121348d source/common/src/tmsgcb.c (Shengliang Guan 2022-03-29 13:39:55 +0800 25) 0164158256d source/common/src/tmsgcb.c (Shengliang Guan 2022-05-18 18:37:39 +0800 26) int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) { 363cbc8985d source/libs/transport/src/tmsgcb.c (Benguang Zhao 2022-11-18 09:37:58 +0800 27) ASSERT(msgcb != NULL); 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 28) int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg); 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 29) if (code != 0) { 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 30) rpcFreeCont(pMsg->pCont); 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 31) pMsg->pCont = NULL; 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 32) } 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 33) return code; 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 34) } 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 35) 0164158256d source/common/src/tmsgcb.c (Shengliang Guan 2022-05-18 18:37:39 +0800 36) int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) { a9b32dc3276 source/common/src/tmsgcb.c (Shengliang Guan 2022-06-02 18:26:29 +0800 37) return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype); b36356ae57a source/common/src/tmsgcb.c (Shengliang Guan 2022-03-22 15:53:29 +0800 38) } b36356ae57a source/common/src/tmsgcb.c (Shengliang Guan 2022-03-22 15:53:29 +0800 39) 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 40) int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) { 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 41) int32_t code = (*defaultMsgCb.sendReqFp)(epSet, pMsg); 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 42) if (code != 0) { 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 43) rpcFreeCont(pMsg->pCont); 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 44) pMsg->pCont = NULL; 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 45) } 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 46) return code; 7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 47) } 220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 48) 9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 49) void tmsgSendRsp(SRpcMsg* pMsg) { 9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 50) #if 1 9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 51) rpcSendResponse(pMsg); 9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 52) #else 9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 53) return (*defaultMsgCb.sendRspFp)(pMsg); 9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 54) #endif 9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 55) } f82afcfe4dd source/common/src/tmsgcb.c (Shengliang Guan 2022-03-28 20:45:52 +0800 56) a9b32dc3276 source/common/src/tmsgcb.c (Shengliang Guan 2022-06-02 18:26:29 +0800 57) void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLinkArgFp)(pMsg); } 182a5ee4b5a source/common/src/tmsgcb.c (Shengliang Guan 2022-03-29 11:37:29 +0800 58) a9b32dc3276 source/common/src/tmsgcb.c (Shengliang Guan 2022-06-02 18:26:29 +0800 59) void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.releaseHandleFp)(pHandle, type); } 7c588cc0747 source/common/src/tmsgcb.c (Shengliang Guan 2022-04-19 19:43:55 +0800 60) 226ccb4ec5a source/libs/transport/src/tmsgcb.c (Minglei Jin 2022-07-23 22:34:35 +0800 61) void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); }