tmsgcb.c 2.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tmsgcb.h"
18
#include "taoserror.h"
S
Shengliang Guan 已提交
19

S
shm  
Shengliang Guan 已提交
20 21 22 23
static SMsgCb tsDefaultMsgCb;

void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }

S
Shengliang Guan 已提交
24
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
25 26
  PutToQueueFp fp = pMsgCb->queueFps[qtype];
  if (fp != NULL) {
S
Shengliang 已提交
27
    return (*fp)(pMsgCb->pMgmt, pReq);
28 29 30 31
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
    return -1;
  }
S
Shengliang Guan 已提交
32 33
}

S
Shengliang Guan 已提交
34
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
35 36
  GetQueueSizeFp fp = pMsgCb->qsizeFp;
  if (fp != NULL) {
S
Shengliang 已提交
37
    return (*fp)(pMsgCb->pMgmt, vgId, qtype);
38 39 40 41
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
    return -1;
  }
S
Shengliang Guan 已提交
42 43
}

S
shm  
Shengliang Guan 已提交
44
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
45 46 47 48 49 50 51
  SendReqFp fp = pMsgCb->sendReqFp;
  if (fp != NULL) {
    return (*fp)(pMsgCb->pWrapper, epSet, pReq);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
    return -1;
  }
S
Shengliang Guan 已提交
52 53
}

54 55 56 57 58 59 60 61
void tmsgSendRsp(const SRpcMsg* pRsp) {
  SendRspFp fp = tsDefaultMsgCb.sendRspFp;
  if (fp != NULL) {
    return (*fp)(tsDefaultMsgCb.pWrapper, pRsp);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
}
S
Shengliang Guan 已提交
62

M
Minghao Li 已提交
63
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
64 65 66 67 68 69
  SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
  if (fp != NULL) {
    (*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
M
Minghao Li 已提交
70 71
}

S
Shengliang Guan 已提交
72 73 74 75 76 77 78 79 80
void tmsgSendMnodeRecv(SRpcMsg* pReq, SRpcMsg* pRsp) {
  SendMnodeRecvFp fp = tsDefaultMsgCb.sendMnodeRecvFp;
  if (fp != NULL) {
    (*fp)(tsDefaultMsgCb.pWrapper, pReq, pRsp);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
}

S
Shengliang Guan 已提交
81
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
82 83 84 85 86 87
  RegisterBrokenLinkArgFp fp = pMsgCb->registerBrokenLinkArgFp;
  if (fp != NULL) {
    (*fp)(pMsgCb->pWrapper, pMsg);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
S
shm  
Shengliang Guan 已提交
88 89
}

S
shm  
Shengliang Guan 已提交
90
void tmsgReleaseHandle(void* handle, int8_t type) {
91 92 93 94 95 96
  ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp;
  if (fp != NULL) {
    (*fp)(tsDefaultMsgCb.pWrapper, handle, type);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
S
Shengliang Guan 已提交
97 98 99
}

void tmsgReportStartup(const char* name, const char* desc) {
100 101 102 103 104 105
  ReportStartup fp = tsDefaultMsgCb.reportStartupFp;
  if (fp != NULL && tsDefaultMsgCb.pWrapper != NULL) {
    (*fp)(tsDefaultMsgCb.pWrapper, name, desc);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
S
Shengliang Guan 已提交
106
}