clientHb.c 7.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "clientInt.h"
L
Liu Jicong 已提交
17
#include "trpc.h"
L
Liu Jicong 已提交
18 19 20 21 22 23

static SClientHbMgr clientHbMgr = {0};

static int32_t hbCreateThread();
static void    hbStopThread();

L
Liu Jicong 已提交
24
static int32_t hbMqHbRspHandle(SClientHbRsp* pRsp) {
L
Liu Jicong 已提交
25 26 27
  return 0;
}

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) {
  if (code != 0) {
    return -1;
  }
  SClientHbRsp* pRsp = (SClientHbRsp*) pMsg->pData;
  return hbMqHbRspHandle(pRsp);
}

L
Liu Jicong 已提交
36 37 38 39
void hbMgrInitMqHbRspHandle() {
  clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
}

L
Liu Jicong 已提交
40 41 42 43 44
static FORCE_INLINE void hbMgrInitHandle() {
  // init all handle
  hbMgrInitMqHbRspHandle();
}

L
Liu Jicong 已提交
45
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
L
Liu Jicong 已提交
46 47
  SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq));
  if (pBatchReq == NULL) {
L
Liu Jicong 已提交
48 49 50
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
51
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
L
Liu Jicong 已提交
52
  pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
L
Liu Jicong 已提交
53

L
Liu Jicong 已提交
54
  void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
L
Liu Jicong 已提交
55 56
  while (pIter != NULL) {
    SClientHbReq* pOneReq = pIter;
L
Liu Jicong 已提交
57
    taosArrayPush(pBatchReq->reqs, pOneReq);
L
Liu Jicong 已提交
58
    taosHashClear(pOneReq->info);
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
L
Liu Jicong 已提交
61 62
  }

L
Liu Jicong 已提交
63
  pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
L
Liu Jicong 已提交
64 65 66 67 68 69
  while (pIter != NULL) {
    FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
    SClientHbKey connKey;
    taosHashCopyKey(pIter, &connKey);
    getConnInfoFp(connKey, NULL);

L
Liu Jicong 已提交
70
    pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
L
Liu Jicong 已提交
71 72
  }

L
Liu Jicong 已提交
73
  return pBatchReq;
L
Liu Jicong 已提交
74 75 76 77 78
}

static void* hbThreadFunc(void* param) {
  setThreadName("hb");
  while (1) {
L
Liu Jicong 已提交
79 80 81 82 83
    int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop);
    if(threadStop) {
      break;
    }

L
Liu Jicong 已提交
84 85
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
    for(int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
86 87
      SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);

L
Liu Jicong 已提交
88 89 90 91
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
      if (connCnt == 0) {
        continue;
      }
L
Liu Jicong 已提交
92
      SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
L
Liu Jicong 已提交
93 94 95
      if (pReq == NULL) {
        continue;
      }
L
Liu Jicong 已提交
96 97 98 99 100 101
      int tlen = tSerializeSClientHbBatchReq(NULL, pReq);
      void *buf = malloc(tlen);
      if (buf == NULL) {
        //TODO: error handling
        break;
      }
L
Liu Jicong 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
      void *bufCopy = buf;
      tSerializeSClientHbBatchReq(&bufCopy, pReq);
      SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
      if (pInfo == NULL) {
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
        tFreeClientHbBatchReq(pReq);
        free(buf);
        break;
      }
      pInfo->fp = hbMqAsyncCallBack;
      pInfo->msgInfo.pData = buf;
      pInfo->msgInfo.len = tlen;
      pInfo->msgType = TDMT_MND_HEARTBEAT;
      pInfo->param = NULL;
      pInfo->requestId = generateRequestId();
      pInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
118 119

      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
L
Liu Jicong 已提交
120
      int64_t transporterId = 0;
L
Liu Jicong 已提交
121
      SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
L
Liu Jicong 已提交
122
      asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
L
Liu Jicong 已提交
123 124 125 126
      tFreeClientHbBatchReq(pReq);

      atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
    }
L
Liu Jicong 已提交
127
    taosMsleep(HEARTBEAT_INTERVAL);
L
Liu Jicong 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
  }
  return NULL;
}

static int32_t hbCreateThread() {
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  pthread_attr_destroy(&thAttr);
  return 0;
}

L
Liu Jicong 已提交
145 146 147 148
static void hbStopThread() {
  atomic_store_8(&clientHbMgr.threadStop, 1);
}

L
Liu Jicong 已提交
149 150
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
  hbMgrInit();
L
Liu Jicong 已提交
151 152 153 154 155 156 157 158
  SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); 
  if (pAppHbMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  // init stat
  pAppHbMgr->startTime = taosGetTimestampMs();

L
Liu Jicong 已提交
159 160
  // init app info
  pAppHbMgr->pAppInstInfo = pAppInstInfo;
L
Liu Jicong 已提交
161 162 163

  // init hash info
  pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
164 165 166 167 168 169

  if (pAppHbMgr->activeInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    free(pAppHbMgr);
    return NULL;
  }
L
Liu Jicong 已提交
170 171 172 173
  pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
  // init getInfoFunc
  pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);

L
Liu Jicong 已提交
174 175 176 177 178 179
  if (pAppHbMgr->getInfoFuncs == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    free(pAppHbMgr);
    return NULL;
  }

L
Liu Jicong 已提交
180 181 182 183 184 185 186 187 188
  taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
  return pAppHbMgr;
}

void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
  pthread_mutex_lock(&clientHbMgr.lock);

  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
189
    SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
L
Liu Jicong 已提交
190 191 192 193 194 195 196 197 198 199
    if (pAppHbMgr == pTarget) {
      taosHashCleanup(pTarget->activeInfo);
      taosHashCleanup(pTarget->getInfoFuncs);
    }
  }

  pthread_mutex_unlock(&clientHbMgr.lock);
}

int hbMgrInit() {
L
Liu Jicong 已提交
200 201 202 203
  // init once
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
  if (old == 1) return 0;

L
Liu Jicong 已提交
204 205
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*));
  pthread_mutex_init(&clientHbMgr.lock, NULL);
L
Liu Jicong 已提交
206 207 208 209 210

  // init handle funcs
  hbMgrInitHandle();

  // init backgroud thread
L
Liu Jicong 已提交
211 212
  hbCreateThread();

L
Liu Jicong 已提交
213 214 215 216
  return 0;
}

void hbMgrCleanUp() {
L
Liu Jicong 已提交
217
  // destroy all appHbMgr
L
Liu Jicong 已提交
218 219 220
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
  if (old == 0) return;

L
Liu Jicong 已提交
221
  taosArrayDestroy(clientHbMgr.appHbMgrs);
L
Liu Jicong 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
}

int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
  int64_t reqId = hbRsp->reqId;
  int64_t rspId = hbRsp->rspId;

  SArray* rsps = hbRsp->rsps;
  int32_t sz = taosArrayGetSize(rsps);
  for (int i = 0; i < sz; i++) {
    SClientHbRsp* pRsp = taosArrayGet(rsps, i);
    if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) {
      clientHbMgr.handle[pRsp->connKey.hbType](pRsp);
    } else {
      // discard rsp
    }
  }
  return 0;
}

L
Liu Jicong 已提交
241
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) {
L
Liu Jicong 已提交
242
  // init hash in activeinfo
L
Liu Jicong 已提交
243
  void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
244 245 246 247 248 249
  if (data != NULL) {
    return 0;
  }
  SClientHbReq hbReq;
  hbReq.connKey = connKey;
  hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
250
  taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
L
Liu Jicong 已提交
251 252
  // init hash
  if (func != NULL) {
L
Liu Jicong 已提交
253
    taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo));
L
Liu Jicong 已提交
254 255
  }

L
Liu Jicong 已提交
256
  atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
257 258 259
  return 0;
}

L
Liu Jicong 已提交
260 261 262 263
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
  taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey));
  atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
264 265
}

L
Liu Jicong 已提交
266
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
L
Liu Jicong 已提交
267
  // find req by connection id
L
Liu Jicong 已提交
268
  SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
269 270 271 272 273 274
  ASSERT(pReq != NULL);

  taosHashPut(pReq->info, key, keyLen, value, valueLen);

  return 0;
}