tmsgcb.c 3.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tmsgcb.h"
18
#include "taoserror.h"
S
Shengliang Guan 已提交
19

S
shm  
Shengliang Guan 已提交
20 21
static SMsgCb tsDefaultMsgCb;

22
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) {
S
Shengliang Guan 已提交
23 24 25
  // if (tsDefaultMsgCb.pWrapper == NULL) {
  tsDefaultMsgCb = *pMsgCb;
  //}
26
}
S
shm  
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
29 30
  PutToQueueFp fp = pMsgCb->queueFps[qtype];
  if (fp != NULL) {
S
Shengliang 已提交
31
    return (*fp)(pMsgCb->pMgmt, pReq);
32 33 34 35
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
    return -1;
  }
S
Shengliang Guan 已提交
36 37
}

S
Shengliang Guan 已提交
38
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
39 40
  GetQueueSizeFp fp = pMsgCb->qsizeFp;
  if (fp != NULL) {
S
Shengliang 已提交
41
    return (*fp)(pMsgCb->pMgmt, vgId, qtype);
42 43 44 45
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
    return -1;
  }
S
Shengliang Guan 已提交
46 47
}

S
shm  
Shengliang Guan 已提交
48
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
49 50 51 52 53 54 55
  SendReqFp fp = pMsgCb->sendReqFp;
  if (fp != NULL) {
    return (*fp)(pMsgCb->pWrapper, epSet, pReq);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
    return -1;
  }
S
Shengliang Guan 已提交
56 57
}

58
void tmsgSendRsp(SRpcMsg* pRsp) {
59 60 61 62 63 64 65
  SendRspFp fp = tsDefaultMsgCb.sendRspFp;
  if (fp != NULL) {
    return (*fp)(tsDefaultMsgCb.pWrapper, pRsp);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
}
S
Shengliang Guan 已提交
66

67
void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
68 69 70 71 72 73
  SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
  if (fp != NULL) {
    (*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
M
Minghao Li 已提交
74 75
}

S
Shengliang Guan 已提交
76 77 78 79 80 81 82 83 84
void tmsgSendMnodeRecv(SRpcMsg* pReq, SRpcMsg* pRsp) {
  SendMnodeRecvFp fp = tsDefaultMsgCb.sendMnodeRecvFp;
  if (fp != NULL) {
    (*fp)(tsDefaultMsgCb.pWrapper, pReq, pRsp);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
}

S
Shengliang Guan 已提交
85
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
86 87 88 89 90 91
  RegisterBrokenLinkArgFp fp = pMsgCb->registerBrokenLinkArgFp;
  if (fp != NULL) {
    (*fp)(pMsgCb->pWrapper, pMsg);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
S
shm  
Shengliang Guan 已提交
92 93
}

S
shm  
Shengliang Guan 已提交
94
void tmsgReleaseHandle(void* handle, int8_t type) {
95 96 97 98 99 100
  ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp;
  if (fp != NULL) {
    (*fp)(tsDefaultMsgCb.pWrapper, handle, type);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
S
Shengliang Guan 已提交
101 102 103
}

void tmsgReportStartup(const char* name, const char* desc) {
104 105 106 107 108 109
  ReportStartup fp = tsDefaultMsgCb.reportStartupFp;
  if (fp != NULL && tsDefaultMsgCb.pWrapper != NULL) {
    (*fp)(tsDefaultMsgCb.pWrapper, name, desc);
  } else {
    terrno = TSDB_CODE_INVALID_PTR;
  }
S
Shengliang Guan 已提交
110
}