clientHb.c 17.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "clientInt.h"
L
Liu Jicong 已提交
17
#include "trpc.h"
D
dapan1121 已提交
18 19
#include "catalog.h"
#include "clientLog.h"
L
Liu Jicong 已提交
20 21 22 23 24 25

static SClientHbMgr clientHbMgr = {0};

static int32_t hbCreateThread();
static void    hbStopThread();

D
dapan1121 已提交
26
static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
L
Liu Jicong 已提交
27 28 29
  return 0;
}

D
dapan1121 已提交
30 31 32 33
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t msgLen = 0;
  int32_t code = 0;

S
Shengliang Guan 已提交
34 35 36 37 38
  SUseDbBatchRsp batchUseRsp = {0};
  if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
D
dapan1121 已提交
39

S
Shengliang Guan 已提交
40 41 42
  int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
D
dapan1121 已提交
43
    tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
D
dapan1121 已提交
44 45
    
    if (rsp->vgVersion < 0) {
D
dapan1121 已提交
46
      code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
D
dapan1121 已提交
47 48 49 50
    } else {
      SDBVgroupInfo vgInfo = {0};
      vgInfo.vgVersion = rsp->vgVersion;
      vgInfo.hashMethod = rsp->hashMethod;
D
dapan1121 已提交
51 52
      vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
      if (NULL == vgInfo.vgHash) {
D
dapan1121 已提交
53 54 55 56
        tscError("hash init[%d] failed", rsp->vgNum);
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
      }

S
Shengliang Guan 已提交
57 58 59
      for (int32_t j = 0; j < rsp->vgNum; ++j) {
        SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
        if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
D
dapan1121 已提交
60
          tscError("hash push failed, errno:%d", errno);
D
dapan1121 已提交
61
          taosHashCleanup(vgInfo.vgHash);
D
dapan1121 已提交
62 63
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
S
Shengliang Guan 已提交
64 65
      }

D
dapan1121 已提交
66
      code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo);
D
dapan1121 已提交
67
      if (code) {
D
dapan1121 已提交
68
        taosHashCleanup(vgInfo.vgHash);
D
dapan1121 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81
      }
    }

    if (code) {
      return code;
    }

    msgLen += sizeof(SUseDbRsp) + rsp->vgNum * sizeof(SVgroupInfo);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t msgLen = 0;
  int32_t code = 0;
  int32_t schemaNum = 0;
  
  while (msgLen < valueLen) {
    STableMetaRsp *rsp = (STableMetaRsp *)((char *)value + msgLen);

    rsp->numOfColumns = ntohl(rsp->numOfColumns);
    rsp->suid = be64toh(rsp->suid);
    
    if (rsp->numOfColumns < 0) {
      schemaNum = 0;
      
      tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);

D
dapan1121 已提交
98
      catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid);
D
dapan 已提交
99
    } else {
D
dapan1121 已提交
100 101
      tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);

D
dapan 已提交
102
      rsp->numOfTags = ntohl(rsp->numOfTags);
D
dapan1121 已提交
103 104 105 106 107 108
      rsp->sversion = ntohl(rsp->sversion);
      rsp->tversion = ntohl(rsp->tversion);
      rsp->tuid = be64toh(rsp->tuid);
      rsp->vgId = ntohl(rsp->vgId);

      SSchema* pSchema = rsp->pSchema;
D
dapan 已提交
109 110 111
      
      schemaNum = rsp->numOfColumns + rsp->numOfTags;

D
dapan1121 已提交
112 113 114
      for (int i = 0; i < schemaNum; ++i) {
        pSchema->bytes = ntohl(pSchema->bytes);
        pSchema->colId = ntohl(pSchema->colId);
D
dapan 已提交
115

D
dapan1121 已提交
116
        pSchema++;
D
dapan 已提交
117 118
      }

D
dapan1121 已提交
119 120 121 122 123 124
      if (rsp->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
        tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchema[0].colId);
        return TSDB_CODE_TSC_INVALID_VALUE;
      }      

      catalogUpdateSTableMeta(pCatalog, rsp);
D
dapan 已提交
125 126 127 128 129 130 131 132 133
    }

    msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
134 135 136 137 138 139 140 141
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
  SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
  if (NULL == info) {
    tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType);
    return TSDB_CODE_SUCCESS;
  }

  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
D
dapan1121 已提交
142 143 144

  tscDebug("hb got %d rsp kv", kvNum);
  
D
dapan1121 已提交
145 146 147
  for (int32_t i = 0; i < kvNum; ++i) {
    SKv *kv = taosArrayGet(pRsp->info, i);
    switch (kv->key) {
D
dapan1121 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
      case HEARTBEAT_KEY_DBINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        int64_t *clusterId = (int64_t *)info->param;
        struct SCatalog *pCatalog = NULL;
        
        int32_t code = catalogGetHandle(*clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
          tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
          break;
        }

        hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
164
        break;
D
dapan1121 已提交
165
      }
D
dapan 已提交
166 167 168 169 170
      case HEARTBEAT_KEY_STBINFO:{
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }
D
dapan1121 已提交
171

D
dapan 已提交
172 173 174 175 176 177 178 179 180 181
        int64_t *clusterId = (int64_t *)info->param;
        struct SCatalog *pCatalog = NULL;
        
        int32_t code = catalogGetHandle(*clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
          tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
          break;
        }

        hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
182
        break;
D
dapan 已提交
183
      }
D
dapan1121 已提交
184 185 186 187 188 189 190 191 192
      default:
        tscError("invalid hb key type:%d", kv->key);
        break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
193
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) {
D
dapan1121 已提交
194
  static int32_t emptyRspNum = 0;
L
Liu Jicong 已提交
195
  if (code != 0) {
D
dapan1121 已提交
196
    tfree(param);
L
Liu Jicong 已提交
197 198
    return -1;
  }
S
Shengliang Guan 已提交
199

D
dapan1121 已提交
200
  char *key = (char *)param;
D
dapan1121 已提交
201
  SClientHbBatchRsp pRsp = {0};
S
Shengliang Guan 已提交
202
  tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
D
dapan1121 已提交
203
  
D
dapan1121 已提交
204
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
D
dapan1121 已提交
205 206 207

  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
  if (pInst == NULL || NULL == *pInst) {
D
dapan1121 已提交
208 209 210
    tscError("cluster not exist, key:%s", key);    
    tfree(param);
    tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
211 212 213
    return -1;
  }

D
dapan1121 已提交
214 215 216
  tfree(param);

  if (rspNum) {
D
dapan 已提交
217
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
D
dapan1121 已提交
218 219 220 221 222
  } else {
    atomic_add_fetch_32(&emptyRspNum, 1);
  }

  for (int32_t i = 0; i < rspNum; ++i) {
D
dapan1121 已提交
223
    SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i);
D
dapan1121 已提交
224 225 226 227 228
    code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp);
    if (code) {
      break;
    }
  }
D
dapan1121 已提交
229 230

  tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
231 232
  
  return code;
L
Liu Jicong 已提交
233 234
}

D
dapan1121 已提交
235 236 237 238 239 240 241 242 243 244
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SDbVgVersion *dbs = NULL;
  uint32_t dbNum = 0;
  int32_t code = 0;

  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
245 246 247 248
  if (dbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
249 250 251 252 253 254 255
  for (int32_t i = 0; i < dbNum; ++i) {
    SDbVgVersion *db = &dbs[i];
    db->dbId = htobe64(db->dbId);
    db->vgVersion = htonl(db->vgVersion);
  }

  SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs};
D
dapan1121 已提交
256 257 258

  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);

D
dapan1121 已提交
259 260 261 262 263
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SSTableMetaVersion *stbs = NULL;
  uint32_t stbNum = 0;
  int32_t code = 0;

  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (stbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < stbNum; ++i) {
    SSTableMetaVersion *stb = &stbs[i];
    stb->suid = htobe64(stb->suid);
    stb->sversion = htons(stb->sversion);
    stb->tversion = htons(stb->tversion);    
  }

  SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};

  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);

  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
  int64_t *clusterId = (int64_t *)param;
  struct SCatalog *pCatalog = NULL;

  int32_t code = catalogGetHandle(*clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
    return code;
  }
  
  code = hbGetExpiredDBInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan 已提交
310 311 312 313 314
  code = hbGetExpiredStbInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327

  return TSDB_CODE_SUCCESS;
}

int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {

}

void hbMgrInitMqHbHandle() {
  clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
  clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle;
  clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle;
  clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
L
Liu Jicong 已提交
328 329
}

L
Liu Jicong 已提交
330 331
static FORCE_INLINE void hbMgrInitHandle() {
  // init all handle
D
dapan1121 已提交
332
  hbMgrInitMqHbHandle();
L
Liu Jicong 已提交
333 334
}

D
dapan1121 已提交
335 336
void hbFreeReq(void *req) {
  SClientHbReq *pReq = (SClientHbReq *)req;
D
dapan1121 已提交
337
  tFreeReqKvHash(pReq->info);
D
dapan1121 已提交
338 339 340 341
}



L
Liu Jicong 已提交
342
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
L
Liu Jicong 已提交
343 344
  SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq));
  if (pBatchReq == NULL) {
L
Liu Jicong 已提交
345 346 347
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
348
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
L
Liu Jicong 已提交
349
  pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
L
Liu Jicong 已提交
350

D
dapan1121 已提交
351
  int32_t code = 0;
L
Liu Jicong 已提交
352
  void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
L
Liu Jicong 已提交
353 354
  while (pIter != NULL) {
    SClientHbReq* pOneReq = pIter;
D
dapan1121 已提交
355 356 357 358 359 360 361 362 363 364

    SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
    if (info) {
      code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq);
      if (code) {
        taosHashCancelIterate(pAppHbMgr->activeInfo, pIter);
        break;
      }
    }

L
Liu Jicong 已提交
365
    taosArrayPush(pBatchReq->reqs, pOneReq);
L
Liu Jicong 已提交
366

L
Liu Jicong 已提交
367
    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
L
Liu Jicong 已提交
368 369
  }

D
dapan1121 已提交
370 371 372
  if (code) {
    taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
    tfree(pBatchReq);
L
Liu Jicong 已提交
373 374
  }

L
Liu Jicong 已提交
375
  return pBatchReq;
L
Liu Jicong 已提交
376 377
}

D
dapan1121 已提交
378 379 380 381 382 383

void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
  void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
  while (pIter != NULL) {
    SClientHbReq* pOneReq = pIter;

D
dapan1121 已提交
384
    tFreeReqKvHash(pOneReq->info);
D
dapan1121 已提交
385 386 387 388 389 390 391 392
    taosHashClear(pOneReq->info);

    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
  }
}



L
Liu Jicong 已提交
393 394 395
static void* hbThreadFunc(void* param) {
  setThreadName("hb");
  while (1) {
396 397
    int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2);
    if(1 == threadStop) {
L
Liu Jicong 已提交
398 399 400
      break;
    }

401 402
    pthread_mutex_lock(&clientHbMgr.lock);

L
Liu Jicong 已提交
403 404
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
    for(int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
405 406
      SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);

L
Liu Jicong 已提交
407 408 409 410
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
      if (connCnt == 0) {
        continue;
      }
L
Liu Jicong 已提交
411
      SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
L
Liu Jicong 已提交
412 413 414
      if (pReq == NULL) {
        continue;
      }
S
Shengliang Guan 已提交
415
      int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
L
Liu Jicong 已提交
416 417
      void *buf = malloc(tlen);
      if (buf == NULL) {
D
dapan1121 已提交
418 419 420
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
        tFreeClientHbBatchReq(pReq, false);
        hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
421 422
        break;
      }
S
Shengliang Guan 已提交
423
      tSerializeSClientHbBatchReq(buf, tlen, pReq);
L
Liu Jicong 已提交
424 425 426
      SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
      if (pInfo == NULL) {
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
L
Liu Jicong 已提交
427
        tFreeClientHbBatchReq(pReq, false);
D
dapan1121 已提交
428
        hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
429 430 431 432 433 434 435
        free(buf);
        break;
      }
      pInfo->fp = hbMqAsyncCallBack;
      pInfo->msgInfo.pData = buf;
      pInfo->msgInfo.len = tlen;
      pInfo->msgType = TDMT_MND_HEARTBEAT;
D
dapan1121 已提交
436
      pInfo->param = strdup(pAppHbMgr->key);
L
Liu Jicong 已提交
437 438
      pInfo->requestId = generateRequestId();
      pInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
439 440

      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
L
Liu Jicong 已提交
441
      int64_t transporterId = 0;
L
Liu Jicong 已提交
442
      SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
L
Liu Jicong 已提交
443
      asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
D
dapan1121 已提交
444 445
      tFreeClientHbBatchReq(pReq, false);      
      hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
446 447 448

      atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
    }
449 450 451

    pthread_mutex_unlock(&clientHbMgr.lock);
    
L
Liu Jicong 已提交
452
    taosMsleep(HEARTBEAT_INTERVAL);
L
Liu Jicong 已提交
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
  }
  return NULL;
}

static int32_t hbCreateThread() {
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  pthread_attr_destroy(&thAttr);
  return 0;
}

L
Liu Jicong 已提交
470
static void hbStopThread() {
471
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
D
dapan1121 已提交
472
    tscDebug("hb thread already stopped");
473 474 475 476 477 478
    return;
  }
  
  while (2 != atomic_load_8(&clientHbMgr.threadStop)) {
    usleep(10);
  }
D
dapan1121 已提交
479 480

  tscDebug("hb thread stopped");  
L
Liu Jicong 已提交
481 482
}

D
dapan1121 已提交
483
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
L
Liu Jicong 已提交
484
  hbMgrInit();
L
Liu Jicong 已提交
485 486 487 488 489 490 491
  SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); 
  if (pAppHbMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  // init stat
  pAppHbMgr->startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
492 493 494
  pAppHbMgr->connKeyCnt = 0;
  pAppHbMgr->reportCnt = 0;
  pAppHbMgr->reportBytes = 0;
D
dapan1121 已提交
495
  pAppHbMgr->key = strdup(key);
L
Liu Jicong 已提交
496

L
Liu Jicong 已提交
497 498
  // init app info
  pAppHbMgr->pAppInstInfo = pAppInstInfo;
L
Liu Jicong 已提交
499 500 501

  // init hash info
  pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
502 503 504 505 506 507

  if (pAppHbMgr->activeInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    free(pAppHbMgr);
    return NULL;
  }
L
Liu Jicong 已提交
508 509
  pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
  // init getInfoFunc
D
dapan1121 已提交
510
  pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
511

D
dapan1121 已提交
512
  if (pAppHbMgr->connInfo == NULL) {
L
Liu Jicong 已提交
513 514 515 516 517
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    free(pAppHbMgr);
    return NULL;
  }

518
  pthread_mutex_lock(&clientHbMgr.lock);
L
Liu Jicong 已提交
519
  taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
520 521
  pthread_mutex_unlock(&clientHbMgr.lock);
  
L
Liu Jicong 已提交
522 523 524
  return pAppHbMgr;
}

D
dapan1121 已提交
525
void appHbMgrCleanup(void) {
L
Liu Jicong 已提交
526 527 528 529
  pthread_mutex_lock(&clientHbMgr.lock);

  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
530
    SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
D
dapan1121 已提交
531 532 533 534
    taosHashCleanup(pTarget->activeInfo);
    pTarget->activeInfo = NULL;
    taosHashCleanup(pTarget->connInfo);
    pTarget->connInfo = NULL;
L
Liu Jicong 已提交
535 536 537 538 539 540
  }

  pthread_mutex_unlock(&clientHbMgr.lock);
}

int hbMgrInit() {
L
Liu Jicong 已提交
541 542 543 544
  // init once
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
  if (old == 1) return 0;

L
Liu Jicong 已提交
545 546
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*));
  pthread_mutex_init(&clientHbMgr.lock, NULL);
L
Liu Jicong 已提交
547 548 549 550 551

  // init handle funcs
  hbMgrInitHandle();

  // init backgroud thread
L
Liu Jicong 已提交
552 553
  hbCreateThread();

L
Liu Jicong 已提交
554 555 556 557
  return 0;
}

void hbMgrCleanUp() {
L
Liu Jicong 已提交
558
  return;
559 560
  hbStopThread();
  
L
Liu Jicong 已提交
561
  // destroy all appHbMgr
L
Liu Jicong 已提交
562 563 564
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
  if (old == 0) return;

565
  pthread_mutex_lock(&clientHbMgr.lock);
D
dapan1121 已提交
566
  appHbMgrCleanup();
567 568 569 570
  taosArrayDestroy(clientHbMgr.appHbMgrs);  
  pthread_mutex_unlock(&clientHbMgr.lock);
  
  clientHbMgr.appHbMgrs = NULL;
L
Liu Jicong 已提交
571 572
}

D
dapan1121 已提交
573
int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
L
Liu Jicong 已提交
574
  // init hash in activeinfo
L
Liu Jicong 已提交
575
  void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
576 577 578 579 580 581
  if (data != NULL) {
    return 0;
  }
  SClientHbReq hbReq;
  hbReq.connKey = connKey;
  hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
D
dapan1121 已提交
582
  
L
Liu Jicong 已提交
583
  taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
D
dapan1121 已提交
584
 
L
Liu Jicong 已提交
585
  // init hash
D
dapan1121 已提交
586 587 588 589
  if (info != NULL) {
    SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
    info->req = pReq;
    taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
L
Liu Jicong 已提交
590 591
  }

L
Liu Jicong 已提交
592
  atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
593 594 595
  return 0;
}

D
dapan1121 已提交
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
  SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
  SHbConnInfo info = {0};

  switch (hbType) {
    case HEARTBEAT_TYPE_QUERY: {
      int64_t *pClusterId = malloc(sizeof(int64_t));
      *pClusterId = clusterId;

      info.param = pClusterId;
      break;
    }
    case HEARTBEAT_TYPE_MQ: {
      break;
    }
    default:
      break;
  }
  
  return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
}

L
Liu Jicong 已提交
618
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
D
dapan1121 已提交
619 620 621 622 623 624
  int32_t code = 0;
  code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
  if (code) {
    return;
  }
L
Liu Jicong 已提交
625
  atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
626 627
}

L
Liu Jicong 已提交
628
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
L
Liu Jicong 已提交
629
  // find req by connection id
L
Liu Jicong 已提交
630
  SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
631 632 633 634 635 636
  ASSERT(pReq != NULL);

  taosHashPut(pReq->info, key, keyLen, value, valueLen);

  return 0;
}