clientHb.c 17.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "clientInt.h"
L
Liu Jicong 已提交
17
#include "trpc.h"
D
dapan1121 已提交
18 19
#include "catalog.h"
#include "clientLog.h"
L
Liu Jicong 已提交
20 21 22 23 24 25

static SClientHbMgr clientHbMgr = {0};

static int32_t hbCreateThread();
static void    hbStopThread();

D
dapan1121 已提交
26
static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
L
Liu Jicong 已提交
27 28 29
  return 0;
}

D
dapan1121 已提交
30 31 32
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

S
Shengliang Guan 已提交
33 34 35 36 37
  SUseDbBatchRsp batchUseRsp = {0};
  if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
D
dapan1121 已提交
38

S
Shengliang Guan 已提交
39 40 41
  int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
D
dapan1121 已提交
42
    tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
D
dapan1121 已提交
43 44
    
    if (rsp->vgVersion < 0) {
D
dapan1121 已提交
45
      code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
D
dapan1121 已提交
46
    } else {
D
dapan1121 已提交
47
      SDBVgInfo vgInfo = {0};
D
dapan1121 已提交
48 49
      vgInfo.vgVersion = rsp->vgVersion;
      vgInfo.hashMethod = rsp->hashMethod;
D
dapan1121 已提交
50 51
      vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
      if (NULL == vgInfo.vgHash) {
D
dapan1121 已提交
52 53 54 55
        tscError("hash init[%d] failed", rsp->vgNum);
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
      }

S
Shengliang Guan 已提交
56 57 58
      for (int32_t j = 0; j < rsp->vgNum; ++j) {
        SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
        if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
D
dapan1121 已提交
59
          tscError("hash push failed, errno:%d", errno);
D
dapan1121 已提交
60
          taosHashCleanup(vgInfo.vgHash);
D
dapan1121 已提交
61 62 63 64
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
      }  
      
D
dapan1121 已提交
65
      catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo);
D
dapan1121 已提交
66 67 68 69 70 71 72
    }

    if (code) {
      return code;
    }
  }

S
Shengliang Guan 已提交
73
  tFreeSUseDbBatchRsp(&batchUseRsp);
D
dapan1121 已提交
74 75 76
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
77 78 79
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

S
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89
  STableMetaBatchRsp batchMetaRsp = {0};
  if (tDeserializeSTableMetaBatchRsp(value, valueLen, &batchMetaRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t numOfBatchs = taosArrayGetSize(batchMetaRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    STableMetaRsp *rsp = taosArrayGet(batchMetaRsp.pArray, i);

D
dapan 已提交
90 91
    if (rsp->numOfColumns < 0) {
      tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
D
dapan1121 已提交
92
      catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
D
dapan 已提交
93
    } else {
D
dapan1121 已提交
94
      tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
S
Shengliang Guan 已提交
95 96 97
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
        tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
        tFreeSTableMetaBatchRsp(&batchMetaRsp);
D
dapan1121 已提交
98
        return TSDB_CODE_TSC_INVALID_VALUE;
S
Shengliang Guan 已提交
99
      }
D
dapan1121 已提交
100 101

      catalogUpdateSTableMeta(pCatalog, rsp);
D
dapan 已提交
102 103 104
    }
  }

S
Shengliang Guan 已提交
105
  tFreeSTableMetaBatchRsp(&batchMetaRsp);
D
dapan 已提交
106 107 108
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
109 110 111 112 113 114 115 116
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
  SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
  if (NULL == info) {
    tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType);
    return TSDB_CODE_SUCCESS;
  }

  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
D
dapan1121 已提交
117 118 119

  tscDebug("hb got %d rsp kv", kvNum);
  
D
dapan1121 已提交
120 121 122
  for (int32_t i = 0; i < kvNum; ++i) {
    SKv *kv = taosArrayGet(pRsp->info, i);
    switch (kv->key) {
D
dapan1121 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
      case HEARTBEAT_KEY_DBINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        int64_t *clusterId = (int64_t *)info->param;
        struct SCatalog *pCatalog = NULL;
        
        int32_t code = catalogGetHandle(*clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
          tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
          break;
        }

        hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
139
        break;
D
dapan1121 已提交
140
      }
D
dapan 已提交
141 142 143 144 145
      case HEARTBEAT_KEY_STBINFO:{
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }
D
dapan1121 已提交
146

D
dapan 已提交
147 148 149 150 151 152 153 154 155 156
        int64_t *clusterId = (int64_t *)info->param;
        struct SCatalog *pCatalog = NULL;
        
        int32_t code = catalogGetHandle(*clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
          tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
          break;
        }

        hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
157
        break;
D
dapan 已提交
158
      }
D
dapan1121 已提交
159 160 161 162 163 164 165 166 167
      default:
        tscError("invalid hb key type:%d", kv->key);
        break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
168
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) {
D
dapan1121 已提交
169
  static int32_t emptyRspNum = 0;
L
Liu Jicong 已提交
170
  if (code != 0) {
D
dapan1121 已提交
171
    tfree(param);
L
Liu Jicong 已提交
172 173
    return -1;
  }
S
Shengliang Guan 已提交
174

D
dapan1121 已提交
175
  char *key = (char *)param;
D
dapan1121 已提交
176
  SClientHbBatchRsp pRsp = {0};
S
Shengliang Guan 已提交
177
  tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
D
dapan1121 已提交
178
  
D
dapan1121 已提交
179
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
D
dapan1121 已提交
180 181 182

  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
  if (pInst == NULL || NULL == *pInst) {
D
dapan1121 已提交
183 184 185
    tscError("cluster not exist, key:%s", key);    
    tfree(param);
    tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
186 187 188
    return -1;
  }

D
dapan1121 已提交
189 190 191
  tfree(param);

  if (rspNum) {
D
dapan 已提交
192
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
D
dapan1121 已提交
193 194 195 196 197
  } else {
    atomic_add_fetch_32(&emptyRspNum, 1);
  }

  for (int32_t i = 0; i < rspNum; ++i) {
D
dapan1121 已提交
198
    SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i);
D
dapan1121 已提交
199 200 201 202 203
    code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp);
    if (code) {
      break;
    }
  }
D
dapan1121 已提交
204 205

  tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
206 207
  
  return code;
L
Liu Jicong 已提交
208 209
}

D
dapan1121 已提交
210 211 212 213 214 215 216 217 218 219
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SDbVgVersion *dbs = NULL;
  uint32_t dbNum = 0;
  int32_t code = 0;

  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
220 221 222 223
  if (dbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
224 225 226 227 228 229 230
  for (int32_t i = 0; i < dbNum; ++i) {
    SDbVgVersion *db = &dbs[i];
    db->dbId = htobe64(db->dbId);
    db->vgVersion = htonl(db->vgVersion);
  }

  SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs};
D
dapan1121 已提交
231 232 233

  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);

D
dapan1121 已提交
234 235 236 237 238
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SSTableMetaVersion *stbs = NULL;
  uint32_t stbNum = 0;
  int32_t code = 0;

  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (stbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < stbNum; ++i) {
    SSTableMetaVersion *stb = &stbs[i];
    stb->suid = htobe64(stb->suid);
    stb->sversion = htons(stb->sversion);
    stb->tversion = htons(stb->tversion);    
  }

  SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};

  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);

  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
  int64_t *clusterId = (int64_t *)param;
  struct SCatalog *pCatalog = NULL;

  int32_t code = catalogGetHandle(*clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
    return code;
  }
  
  code = hbGetExpiredDBInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan 已提交
285 286 287 288 289
  code = hbGetExpiredStbInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302

  return TSDB_CODE_SUCCESS;
}

int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {

}

void hbMgrInitMqHbHandle() {
  clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
  clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle;
  clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle;
  clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
L
Liu Jicong 已提交
303 304
}

L
Liu Jicong 已提交
305 306
static FORCE_INLINE void hbMgrInitHandle() {
  // init all handle
D
dapan1121 已提交
307
  hbMgrInitMqHbHandle();
L
Liu Jicong 已提交
308 309
}

D
dapan1121 已提交
310 311
void hbFreeReq(void *req) {
  SClientHbReq *pReq = (SClientHbReq *)req;
D
dapan1121 已提交
312
  tFreeReqKvHash(pReq->info);
D
dapan1121 已提交
313 314 315 316
}



L
Liu Jicong 已提交
317
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
S
Shengliang Guan 已提交
318
  SClientHbBatchReq* pBatchReq = calloc(1, sizeof(SClientHbBatchReq));
L
Liu Jicong 已提交
319
  if (pBatchReq == NULL) {
L
Liu Jicong 已提交
320 321 322
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
323
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
L
Liu Jicong 已提交
324
  pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
L
Liu Jicong 已提交
325

D
dapan1121 已提交
326
  int32_t code = 0;
L
Liu Jicong 已提交
327
  void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
L
Liu Jicong 已提交
328 329
  while (pIter != NULL) {
    SClientHbReq* pOneReq = pIter;
D
dapan1121 已提交
330 331 332 333 334 335 336 337 338 339

    SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
    if (info) {
      code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq);
      if (code) {
        taosHashCancelIterate(pAppHbMgr->activeInfo, pIter);
        break;
      }
    }

L
Liu Jicong 已提交
340
    taosArrayPush(pBatchReq->reqs, pOneReq);
L
Liu Jicong 已提交
341

L
Liu Jicong 已提交
342
    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
L
Liu Jicong 已提交
343 344
  }

D
dapan1121 已提交
345 346 347
  if (code) {
    taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
    tfree(pBatchReq);
L
Liu Jicong 已提交
348 349
  }

L
Liu Jicong 已提交
350
  return pBatchReq;
L
Liu Jicong 已提交
351 352
}

D
dapan1121 已提交
353 354 355 356 357 358

void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
  void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
  while (pIter != NULL) {
    SClientHbReq* pOneReq = pIter;

D
dapan1121 已提交
359
    tFreeReqKvHash(pOneReq->info);
D
dapan1121 已提交
360 361 362 363 364 365 366 367
    taosHashClear(pOneReq->info);

    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
  }
}



L
Liu Jicong 已提交
368 369 370
static void* hbThreadFunc(void* param) {
  setThreadName("hb");
  while (1) {
371 372
    int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2);
    if(1 == threadStop) {
L
Liu Jicong 已提交
373 374 375
      break;
    }

376 377
    pthread_mutex_lock(&clientHbMgr.lock);

L
Liu Jicong 已提交
378 379
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
    for(int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
380 381
      SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);

L
Liu Jicong 已提交
382 383 384 385
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
      if (connCnt == 0) {
        continue;
      }
L
Liu Jicong 已提交
386
      SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
L
Liu Jicong 已提交
387 388 389
      if (pReq == NULL) {
        continue;
      }
S
Shengliang Guan 已提交
390
      int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
L
Liu Jicong 已提交
391 392
      void *buf = malloc(tlen);
      if (buf == NULL) {
D
dapan1121 已提交
393 394 395
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
        tFreeClientHbBatchReq(pReq, false);
        hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
396 397
        break;
      }
S
Shengliang Guan 已提交
398
      tSerializeSClientHbBatchReq(buf, tlen, pReq);
L
Liu Jicong 已提交
399 400 401
      SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
      if (pInfo == NULL) {
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
L
Liu Jicong 已提交
402
        tFreeClientHbBatchReq(pReq, false);
D
dapan1121 已提交
403
        hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
404 405 406 407 408 409 410
        free(buf);
        break;
      }
      pInfo->fp = hbMqAsyncCallBack;
      pInfo->msgInfo.pData = buf;
      pInfo->msgInfo.len = tlen;
      pInfo->msgType = TDMT_MND_HEARTBEAT;
D
dapan1121 已提交
411
      pInfo->param = strdup(pAppHbMgr->key);
L
Liu Jicong 已提交
412 413
      pInfo->requestId = generateRequestId();
      pInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
414 415

      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
L
Liu Jicong 已提交
416
      int64_t transporterId = 0;
L
Liu Jicong 已提交
417
      SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
L
Liu Jicong 已提交
418
      asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
D
dapan1121 已提交
419 420
      tFreeClientHbBatchReq(pReq, false);      
      hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
421 422 423

      atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
    }
424 425 426

    pthread_mutex_unlock(&clientHbMgr.lock);
    
L
Liu Jicong 已提交
427
    taosMsleep(HEARTBEAT_INTERVAL);
L
Liu Jicong 已提交
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
  }
  return NULL;
}

static int32_t hbCreateThread() {
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  pthread_attr_destroy(&thAttr);
  return 0;
}

L
Liu Jicong 已提交
445
static void hbStopThread() {
446
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
D
dapan1121 已提交
447
    tscDebug("hb thread already stopped");
448 449 450 451 452 453
    return;
  }
  
  while (2 != atomic_load_8(&clientHbMgr.threadStop)) {
    usleep(10);
  }
D
dapan1121 已提交
454 455

  tscDebug("hb thread stopped");  
L
Liu Jicong 已提交
456 457
}

D
dapan1121 已提交
458
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
L
Liu Jicong 已提交
459
  hbMgrInit();
L
Liu Jicong 已提交
460 461 462 463 464 465 466
  SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); 
  if (pAppHbMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  // init stat
  pAppHbMgr->startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
467 468 469
  pAppHbMgr->connKeyCnt = 0;
  pAppHbMgr->reportCnt = 0;
  pAppHbMgr->reportBytes = 0;
D
dapan1121 已提交
470
  pAppHbMgr->key = strdup(key);
L
Liu Jicong 已提交
471

L
Liu Jicong 已提交
472 473
  // init app info
  pAppHbMgr->pAppInstInfo = pAppInstInfo;
L
Liu Jicong 已提交
474 475 476

  // init hash info
  pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
477 478 479 480 481 482

  if (pAppHbMgr->activeInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    free(pAppHbMgr);
    return NULL;
  }
L
Liu Jicong 已提交
483 484
  pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
  // init getInfoFunc
D
dapan1121 已提交
485
  pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
486

D
dapan1121 已提交
487
  if (pAppHbMgr->connInfo == NULL) {
L
Liu Jicong 已提交
488 489 490 491 492
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    free(pAppHbMgr);
    return NULL;
  }

493
  pthread_mutex_lock(&clientHbMgr.lock);
L
Liu Jicong 已提交
494
  taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
495 496
  pthread_mutex_unlock(&clientHbMgr.lock);
  
L
Liu Jicong 已提交
497 498 499
  return pAppHbMgr;
}

D
dapan1121 已提交
500
void appHbMgrCleanup(void) {
L
Liu Jicong 已提交
501 502 503 504
  pthread_mutex_lock(&clientHbMgr.lock);

  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
505
    SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
D
dapan1121 已提交
506 507 508 509
    taosHashCleanup(pTarget->activeInfo);
    pTarget->activeInfo = NULL;
    taosHashCleanup(pTarget->connInfo);
    pTarget->connInfo = NULL;
L
Liu Jicong 已提交
510 511 512 513 514 515
  }

  pthread_mutex_unlock(&clientHbMgr.lock);
}

int hbMgrInit() {
L
Liu Jicong 已提交
516 517 518 519
  // init once
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
  if (old == 1) return 0;

L
Liu Jicong 已提交
520 521
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*));
  pthread_mutex_init(&clientHbMgr.lock, NULL);
L
Liu Jicong 已提交
522 523 524 525 526

  // init handle funcs
  hbMgrInitHandle();

  // init backgroud thread
L
Liu Jicong 已提交
527 528
  hbCreateThread();

L
Liu Jicong 已提交
529 530 531 532
  return 0;
}

void hbMgrCleanUp() {
L
Liu Jicong 已提交
533
  return;
534 535
  hbStopThread();
  
L
Liu Jicong 已提交
536
  // destroy all appHbMgr
L
Liu Jicong 已提交
537 538 539
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
  if (old == 0) return;

540
  pthread_mutex_lock(&clientHbMgr.lock);
D
dapan1121 已提交
541
  appHbMgrCleanup();
542 543 544 545
  taosArrayDestroy(clientHbMgr.appHbMgrs);  
  pthread_mutex_unlock(&clientHbMgr.lock);
  
  clientHbMgr.appHbMgrs = NULL;
L
Liu Jicong 已提交
546 547
}

D
dapan1121 已提交
548
int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
L
Liu Jicong 已提交
549
  // init hash in activeinfo
L
Liu Jicong 已提交
550
  void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
551 552 553 554 555 556
  if (data != NULL) {
    return 0;
  }
  SClientHbReq hbReq;
  hbReq.connKey = connKey;
  hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
D
dapan1121 已提交
557
  
L
Liu Jicong 已提交
558
  taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
D
dapan1121 已提交
559
 
L
Liu Jicong 已提交
560
  // init hash
D
dapan1121 已提交
561 562 563 564
  if (info != NULL) {
    SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
    info->req = pReq;
    taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
L
Liu Jicong 已提交
565 566
  }

L
Liu Jicong 已提交
567
  atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
568 569 570
  return 0;
}

D
dapan1121 已提交
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
  SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
  SHbConnInfo info = {0};

  switch (hbType) {
    case HEARTBEAT_TYPE_QUERY: {
      int64_t *pClusterId = malloc(sizeof(int64_t));
      *pClusterId = clusterId;

      info.param = pClusterId;
      break;
    }
    case HEARTBEAT_TYPE_MQ: {
      break;
    }
    default:
      break;
  }
  
  return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
}

L
Liu Jicong 已提交
593
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
D
dapan1121 已提交
594 595 596 597 598 599
  int32_t code = 0;
  code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
  if (code) {
    return;
  }
L
Liu Jicong 已提交
600
  atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
601 602
}

L
Liu Jicong 已提交
603
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
L
Liu Jicong 已提交
604
  // find req by connection id
L
Liu Jicong 已提交
605
  SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
606 607 608 609 610 611
  ASSERT(pReq != NULL);

  taosHashPut(pReq->info, key, keyLen, value, valueLen);

  return 0;
}