clientHb.c 26.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

D
dapan1121 已提交
16
#include "catalog.h"
L
fix  
Liu Jicong 已提交
17
#include "clientInt.h"
D
dapan1121 已提交
18
#include "clientLog.h"
L
Liu Jicong 已提交
19
#include "scheduler.h"
L
fix  
Liu Jicong 已提交
20
#include "trpc.h"
L
Liu Jicong 已提交
21 22 23 24 25 26

static SClientHbMgr clientHbMgr = {0};

static int32_t hbCreateThread();
static void    hbStopThread();

L
Liu Jicong 已提交
27 28
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }

L
Liu Jicong 已提交
29
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
L
Liu Jicong 已提交
30

D
dapan 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

  SUserAuthBatchRsp batchRsp = {0};
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
    tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version);

    catalogUpdateUserAuthInfo(pCatalog, rsp);
  }

D
dapan1121 已提交
48
  taosArrayDestroy(batchRsp.pArray);
D
dapan 已提交
49 50 51
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
52 53 54
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

S
Shengliang Guan 已提交
55 56 57 58 59
  SUseDbBatchRsp batchUseRsp = {0};
  if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
D
dapan1121 已提交
60

S
Shengliang Guan 已提交
61 62 63
  int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
L
fix  
Liu Jicong 已提交
64 65
    tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->uid);

D
dapan1121 已提交
66
    if (rsp->vgVersion < 0) {
D
dapan1121 已提交
67
      code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
D
dapan1121 已提交
68
    } else {
D
dapan1121 已提交
69 70 71 72 73 74 75 76 77 78
      SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
      if (NULL == vgInfo) {
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
      }
      
      vgInfo->vgVersion = rsp->vgVersion;
      vgInfo->hashMethod = rsp->hashMethod;
      vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
      if (NULL == vgInfo->vgHash) {
        taosMemoryFree(vgInfo);
D
dapan1121 已提交
79 80 81 82
        tscError("hash init[%d] failed", rsp->vgNum);
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
      }

S
Shengliang Guan 已提交
83 84
      for (int32_t j = 0; j < rsp->vgNum; ++j) {
        SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
D
dapan1121 已提交
85
        if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
D
dapan1121 已提交
86
          tscError("hash push failed, errno:%d", errno);
D
dapan1121 已提交
87 88
          taosHashCleanup(vgInfo->vgHash);
          taosMemoryFree(vgInfo);
D
dapan1121 已提交
89 90
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
L
fix  
Liu Jicong 已提交
91 92
      }

D
dapan1121 已提交
93
      catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
D
dapan1121 已提交
94 95 96 97 98 99 100
    }

    if (code) {
      return code;
    }
  }

S
Shengliang Guan 已提交
101
  tFreeSUseDbBatchRsp(&batchUseRsp);
D
dapan1121 已提交
102 103 104
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
105 106 107
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

D
dapan1121 已提交
108 109
  SSTbHbRsp hbRsp = {0};
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
S
Shengliang Guan 已提交
110 111 112 113
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

D
dapan1121 已提交
114 115 116
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
  for (int32_t i = 0; i < numOfMeta; ++i) {
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
S
Shengliang Guan 已提交
117

D
dapan 已提交
118 119
    if (rsp->numOfColumns < 0) {
      tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
D
dapan1121 已提交
120
      catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
D
dapan 已提交
121
    } else {
D
dapan1121 已提交
122
      tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
S
Shengliang Guan 已提交
123
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
124
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
D
dapan1121 已提交
125
        tFreeSSTbHbRsp(&hbRsp);
D
dapan1121 已提交
126
        return TSDB_CODE_TSC_INVALID_VALUE;
D
dapan 已提交
127 128
      }

D
dapan1121 已提交
129
      catalogUpdateTableMeta(pCatalog, rsp);
D
dapan 已提交
130 131 132
    }
  }

D
dapan1121 已提交
133 134 135 136 137 138 139 140 141 142
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
  for (int32_t i = 0; i < numOfIndex; ++i) {
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);

    catalogUpdateTableIndex(pCatalog, rsp);
  }

  taosArrayDestroy(hbRsp.pIndexRsp);
  hbRsp.pIndexRsp = NULL;

D
dapan1121 已提交
143
  tFreeSSTbHbRsp(&hbRsp);
D
dapan 已提交
144 145 146
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
147
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
D
dapan1121 已提交
148 149 150
  SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
  if (NULL == pReq) {
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
L
Liu Jicong 已提交
151
            pRsp->connKey.connType);
D
dapan1121 已提交
152 153 154
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
155 156 157 158
  if (pRsp->query) {
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
    if (NULL == pTscObj) {
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
D
dapan1121 已提交
159
    } else {      
D
dapan1121 已提交
160
      if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) {
D
dapan1121 已提交
161 162 163 164 165 166 167
        SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
        SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
        SEp* pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
        tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", 
            pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port, 
            pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn, pNewEp->port);
            
D
dapan1121 已提交
168 169
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
      }
D
dapan1121 已提交
170 171 172
      
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
D
dapan1121 已提交
173
      pTscObj->connId = pRsp->query->connId;
L
Liu Jicong 已提交
174

D
dapan1121 已提交
175
      if (pRsp->query->killRid) {
D
dapan1121 已提交
176
        tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
D
dapan1121 已提交
177 178 179 180 181 182 183 184
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
        if (NULL == pRequest) {
          tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid);
        } else {
          taos_stop_query((TAOS_RES *)pRequest);
          releaseRequest(pRsp->query->killRid);
        }
      }
L
Liu Jicong 已提交
185

D
dapan1121 已提交
186
      if (pRsp->query->killConnection) {
187
        taos_close_internal(pTscObj);
D
dapan1121 已提交
188 189
      }

D
dapan1121 已提交
190 191 192 193
      if (pRsp->query->pQnodeList) {
        updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList);
      }

D
dapan1121 已提交
194 195 196
      releaseTscObj(pRsp->connKey.tscRid);
    }
  }
L
Liu Jicong 已提交
197

D
dapan1121 已提交
198
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
D
dapan1121 已提交
199 200

  tscDebug("hb got %d rsp kv", kvNum);
L
fix  
Liu Jicong 已提交
201

D
dapan1121 已提交
202 203 204
  for (int32_t i = 0; i < kvNum; ++i) {
    SKv *kv = taosArrayGet(pRsp->info, i);
    switch (kv->key) {
D
dapan 已提交
205 206 207 208 209 210 211 212
      case HEARTBEAT_KEY_USER_AUTHINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        struct SCatalog *pCatalog = NULL;

D
dapan1121 已提交
213
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan 已提交
214
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
215
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan 已提交
216 217 218 219 220 221
          break;
        }

        hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog);
        break;
      }
D
dapan1121 已提交
222 223 224 225 226 227 228
      case HEARTBEAT_KEY_DBINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        struct SCatalog *pCatalog = NULL;
L
fix  
Liu Jicong 已提交
229

D
dapan1121 已提交
230
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan1121 已提交
231
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
232
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan1121 已提交
233 234 235 236
          break;
        }

        hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
237
        break;
D
dapan1121 已提交
238
      }
L
fix  
Liu Jicong 已提交
239
      case HEARTBEAT_KEY_STBINFO: {
D
dapan 已提交
240 241 242 243
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }
D
dapan1121 已提交
244

D
dapan 已提交
245
        struct SCatalog *pCatalog = NULL;
L
fix  
Liu Jicong 已提交
246

D
dapan1121 已提交
247
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan 已提交
248
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
249
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan 已提交
250 251 252 253
          break;
        }

        hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
254
        break;
D
dapan 已提交
255
      }
D
dapan1121 已提交
256 257 258 259 260 261 262 263 264
      default:
        tscError("invalid hb key type:%d", kv->key);
        break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
265
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
266
  static int32_t emptyRspNum = 0;
L
fix  
Liu Jicong 已提交
267
  char             *key = (char *)param;
D
dapan1121 已提交
268
  SClientHbBatchRsp pRsp = {0};
D
dapan1121 已提交
269 270 271 272
  if (TSDB_CODE_SUCCESS == code) {
    tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
  }
  
D
dapan1121 已提交
273
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
D
dapan1121 已提交
274

D
dapan1121 已提交
275 276
  taosThreadMutexLock(&appInfo.mutex);

L
fix  
Liu Jicong 已提交
277
  SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
D
dapan1121 已提交
278
  if (pInst == NULL || NULL == *pInst) {
D
dapan1121 已提交
279
    taosThreadMutexUnlock(&appInfo.mutex);
L
fix  
Liu Jicong 已提交
280
    tscError("cluster not exist, key:%s", key);
wafwerar's avatar
wafwerar 已提交
281
    taosMemoryFreeClear(param);
D
dapan1121 已提交
282
    tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
283 284 285
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
286
  taosMemoryFreeClear(param);
D
dapan1121 已提交
287

D
dapan1121 已提交
288 289 290 291
  if (code != 0) {
    (*pInst)->onlineDnodes = 0;
  }

D
dapan1121 已提交
292
  if (rspNum) {
L
fix  
Liu Jicong 已提交
293 294
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
D
dapan1121 已提交
295 296 297 298 299
  } else {
    atomic_add_fetch_32(&emptyRspNum, 1);
  }

  for (int32_t i = 0; i < rspNum; ++i) {
L
fix  
Liu Jicong 已提交
300
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
D
dapan1121 已提交
301
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])((*pInst)->pAppHbMgr, rsp);
D
dapan1121 已提交
302 303 304 305
    if (code) {
      break;
    }
  }
D
dapan1121 已提交
306

D
dapan1121 已提交
307 308
  taosThreadMutexUnlock(&appInfo.mutex);

D
dapan1121 已提交
309
  tFreeClientHbBatchRsp(&pRsp);
L
fix  
Liu Jicong 已提交
310

D
dapan1121 已提交
311
  return code;
L
Liu Jicong 已提交
312 313
}

D
dapan1121 已提交
314
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
L
Liu Jicong 已提交
315
  int64_t    now = taosGetTimestampUs();
D
dapan1121 已提交
316
  SQueryDesc desc = {0};
L
Liu Jicong 已提交
317
  int32_t    code = 0;
D
dapan1121 已提交
318

L
Liu Jicong 已提交
319
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
D
dapan1121 已提交
320
  while (pIter != NULL) {
L
Liu Jicong 已提交
321
    int64_t     *rid = pIter;
D
dapan1121 已提交
322
    SRequestObj *pRequest = acquireRequest(*rid);
D
dapan1121 已提交
323
    if (NULL == pRequest || pRequest->killed) {
D
dapan1121 已提交
324
      pIter = taosHashIterate(pObj->pRequests, pIter);
D
dapan1121 已提交
325 326 327 328
      continue;
    }

    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
329
    desc.stime = pRequest->metric.start / 1000;
L
Liu Jicong 已提交
330
    desc.queryId = pRequest->requestId;
D
dapan1121 已提交
331
    desc.useconds = now - pRequest->metric.start;
L
Liu Jicong 已提交
332
    desc.reqRid = pRequest->self;
333
    desc.stableQuery = pRequest->stableQuery;
D
dapan1121 已提交
334
    taosGetFqdn(desc.fqdn);
D
dapan1121 已提交
335
    desc.subPlanNum = pRequest->body.subplanNum;
D
dapan1121 已提交
336 337 338 339 340 341 342 343 344 345 346 347

    if (desc.subPlanNum) {
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
      if (NULL == desc.subDesc) {
        releaseRequest(*rid);
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }

      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
      if (code) {
        taosArrayDestroy(desc.subDesc);
        desc.subDesc = NULL;
348
        desc.subPlanNum = 0;
D
dapan1121 已提交
349
      }
D
dapan1121 已提交
350 351
    } else {
      desc.subDesc = NULL;
D
dapan1121 已提交
352 353
    }

L
Liu Jicong 已提交
354
    releaseRequest(*rid);
D
dapan1121 已提交
355
    taosArrayPush(hbBasic->queryDesc, &desc);
L
Liu Jicong 已提交
356

D
dapan1121 已提交
357 358 359 360 361 362 363 364 365 366 367 368
    pIter = taosHashIterate(pObj->pRequests, pIter);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
  if (NULL == pTscObj) {
    tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
369

D
dapan1121 已提交
370 371 372 373 374 375
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
  if (NULL == hbBasic) {
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
    releaseTscObj(connKey->tscRid);
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
376 377 378 379 380 381 382 383 384 385
  
  hbBasic->connId = pTscObj->connId;

  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
  if (numOfQueries <= 0) {
    req->query = hbBasic;
    releaseTscObj(connKey->tscRid);
    tscDebug("no queries on connection");
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
386 387 388 389 390 391 392 393

  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
  if (NULL == hbBasic->queryDesc) {
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
    releaseTscObj(connKey->tscRid);
    taosMemoryFree(hbBasic);
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
L
Liu Jicong 已提交
394

D
dapan1121 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408

  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
  if (code) {
    releaseTscObj(connKey->tscRid);
    taosMemoryFree(hbBasic);
    return code;
  }

  req->query = hbBasic;
  releaseTscObj(connKey->tscRid);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SUserAuthVersion *users = NULL;
  uint32_t          userNum = 0;
  int32_t           code = 0;

  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (userNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < userNum; ++i) {
    SUserAuthVersion *user = &users[i];
    user->version = htonl(user->version);
  }

  SKv kv = {
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
      .valueLen = sizeof(SUserAuthVersion) * userNum,
      .value = users,
  };

  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);

D
dapan1121 已提交
436 437 438 439
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }
  
D
dapan 已提交
440 441 442 443 444 445
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
446 447
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SDbVgVersion *dbs = NULL;
L
fix  
Liu Jicong 已提交
448 449
  uint32_t      dbNum = 0;
  int32_t       code = 0;
D
dapan1121 已提交
450 451 452 453 454 455

  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
456 457 458 459
  if (dbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
460 461 462 463
  for (int32_t i = 0; i < dbNum; ++i) {
    SDbVgVersion *db = &dbs[i];
    db->dbId = htobe64(db->dbId);
    db->vgVersion = htonl(db->vgVersion);
D
dapan 已提交
464
    db->numOfTable = htonl(db->numOfTable);
D
dapan1121 已提交
465 466
  }

L
Liu Jicong 已提交
467 468 469 470 471
  SKv kv = {
      .key = HEARTBEAT_KEY_DBINFO,
      .valueLen = sizeof(SDbVgVersion) * dbNum,
      .value = dbs,
  };
D
dapan1121 已提交
472 473 474

  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);

D
dapan1121 已提交
475 476 477 478
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }

D
dapan1121 已提交
479 480 481 482 483
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
484
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
D
dapan1121 已提交
485
  SSTableVersion *stbs = NULL;
L
fix  
Liu Jicong 已提交
486 487
  uint32_t            stbNum = 0;
  int32_t             code = 0;
D
dapan 已提交
488 489 490 491 492 493 494 495 496 497 498

  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (stbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < stbNum; ++i) {
D
dapan1121 已提交
499
    SSTableVersion *stb = &stbs[i];
D
dapan 已提交
500 501
    stb->suid = htobe64(stb->suid);
    stb->sversion = htons(stb->sversion);
L
fix  
Liu Jicong 已提交
502
    stb->tversion = htons(stb->tversion);
D
dapan1121 已提交
503
    stb->smaVer = htonl(stb->smaVer);
D
dapan 已提交
504 505
  }

L
Liu Jicong 已提交
506 507
  SKv kv = {
      .key = HEARTBEAT_KEY_STBINFO,
D
dapan1121 已提交
508
      .valueLen = sizeof(SSTableVersion) * stbNum,
L
Liu Jicong 已提交
509 510
      .value = stbs,
  };
D
dapan 已提交
511 512 513

  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);

D
dapan1121 已提交
514 515 516 517
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }

D
dapan 已提交
518 519 520 521 522
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
  SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
  if (NULL != pApp) {
    memcpy(&req->app, pApp, sizeof(*pApp));
  } else {
    memset(&req->app.summary, 0, sizeof(req->app.summary));
    req->app.pid = taosGetPId();
    req->app.appId = clientHbMgr.appId;
    taosGetAppName(req->app.name, NULL);    
  }

  return TSDB_CODE_SUCCESS;
}


L
fix  
Liu Jicong 已提交
538 539
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
  int64_t         *clusterId = (int64_t *)param;
D
dapan1121 已提交
540 541 542 543
  struct SCatalog *pCatalog = NULL;

  int32_t code = catalogGetHandle(*clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
L
fix  
Liu Jicong 已提交
544
    tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
D
dapan1121 已提交
545 546
    return code;
  }
L
fix  
Liu Jicong 已提交
547

D
dapan1121 已提交
548 549
  hbGetAppInfo(*clusterId, req);

D
dapan1121 已提交
550
  hbGetQueryBasicInfo(connKey, req);
L
Liu Jicong 已提交
551

D
dapan 已提交
552 553 554 555 556
  code = hbGetExpiredUserInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
557 558 559 560 561
  code = hbGetExpiredDBInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan 已提交
562 563 564 565 566
  code = hbGetExpiredStbInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
567 568 569 570
  return TSDB_CODE_SUCCESS;
}

void hbMgrInitMqHbHandle() {
D
dapan1121 已提交
571 572
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle;
L
Liu Jicong 已提交
573

D
dapan1121 已提交
574 575
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
L
Liu Jicong 已提交
576 577
}

L
Liu Jicong 已提交
578 579
static FORCE_INLINE void hbMgrInitHandle() {
  // init all handle
D
dapan1121 已提交
580
  hbMgrInitMqHbHandle();
L
Liu Jicong 已提交
581 582
}

L
fix  
Liu Jicong 已提交
583
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
wafwerar's avatar
wafwerar 已提交
584
  SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
L
Liu Jicong 已提交
585
  if (pBatchReq == NULL) {
L
Liu Jicong 已提交
586 587 588
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
589
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
L
Liu Jicong 已提交
590
  pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
L
Liu Jicong 已提交
591

D
dapan1121 已提交
592
  int32_t code = 0;
L
fix  
Liu Jicong 已提交
593
  void   *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
L
Liu Jicong 已提交
594
  while (pIter != NULL) {
L
fix  
Liu Jicong 已提交
595
    SClientHbReq *pOneReq = pIter;
D
dapan1121 已提交
596

D
dapan1121 已提交
597 598
    pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);

D
dapan1121 已提交
599 600 601 602
    code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq);
    if (code) {
      pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
      continue;
D
dapan1121 已提交
603 604
    }

D
dapan1121 已提交
605
    //hbClearClientHbReq(pOneReq);
L
Liu Jicong 已提交
606

L
Liu Jicong 已提交
607
    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
L
Liu Jicong 已提交
608 609
  }

L
Liu Jicong 已提交
610 611 612 613
  //  if (code) {
  //    taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
  //    taosMemoryFreeClear(pBatchReq);
  //  }
L
Liu Jicong 已提交
614

L
Liu Jicong 已提交
615
  return pBatchReq;
L
Liu Jicong 已提交
616 617
}

wafwerar's avatar
wafwerar 已提交
618 619 620 621
void hbThreadFuncUnexpectedStopped(void) {
  atomic_store_8(&clientHbMgr.threadStop, 2);
}

D
dapan1121 已提交
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) {
  dst->numOfInsertsReq += src->numOfInsertsReq;
  dst->numOfInsertRows += src->numOfInsertRows;
  dst->insertElapsedTime += src->insertElapsedTime;
  dst->insertBytes += src->insertBytes;
  dst->fetchBytes += src->fetchBytes;
  dst->queryElapsedTime += src->queryElapsedTime;
  dst->numOfSlowQueries += src->numOfSlowQueries;
  dst->totalRequests += src->totalRequests;
  dst->currentRequests += src->currentRequests;
}

int32_t hbGatherAppInfo(void) {
  SAppHbReq req = {0};
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  if (sz > 0) {
    req.pid = taosGetPId();
    req.appId = clientHbMgr.appId;
    taosGetAppName(req.name, NULL);
  }
D
dapan1121 已提交
642 643

  taosHashClear(clientHbMgr.appSummary);
D
dapan1121 已提交
644 645 646 647 648 649 650
  
  for (int32_t i = 0; i < sz; ++i) {
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
    uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
    SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
    if (NULL == pApp) {
      memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
D
dapan1121 已提交
651
      req.startTime = pAppHbMgr->startTime;
D
dapan1121 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665
      taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req));
    } else {
      if (pAppHbMgr->startTime < pApp->startTime) {
        pApp->startTime = pAppHbMgr->startTime;
      }
      
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
    }
  }

  return TSDB_CODE_SUCCESS;
}


L
fix  
Liu Jicong 已提交
666
static void *hbThreadFunc(void *param) {
L
Liu Jicong 已提交
667
  setThreadName("hb");
wafwerar's avatar
wafwerar 已提交
668
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
669 670 671
  if (taosCheckCurrentInDll()) {
    atexit(hbThreadFuncUnexpectedStopped);
  }
wafwerar's avatar
wafwerar 已提交
672
#endif
L
Liu Jicong 已提交
673
  while (1) {
674
    int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2);
L
fix  
Liu Jicong 已提交
675
    if (1 == threadStop) {
L
Liu Jicong 已提交
676 677 678
      break;
    }

wafwerar's avatar
wafwerar 已提交
679
    taosThreadMutexLock(&clientHbMgr.lock);
680

L
Liu Jicong 已提交
681
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
D
dapan1121 已提交
682 683 684 685
    if (sz > 0) {
      hbGatherAppInfo();
    }
    
L
fix  
Liu Jicong 已提交
686 687
    for (int i = 0; i < sz; i++) {
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
L
Liu Jicong 已提交
688

L
Liu Jicong 已提交
689 690 691 692
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
      if (connCnt == 0) {
        continue;
      }
L
fix  
Liu Jicong 已提交
693
      SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
L
Liu Jicong 已提交
694 695 696
      if (pReq == NULL) {
        continue;
      }
L
fix  
Liu Jicong 已提交
697
      int   tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
wafwerar's avatar
wafwerar 已提交
698
      void *buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
699
      if (buf == NULL) {
D
dapan1121 已提交
700
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
D
dapan1121 已提交
701 702
        tFreeClientHbBatchReq(pReq);
        //hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
703 704
        break;
      }
L
fix  
Liu Jicong 已提交
705

S
Shengliang Guan 已提交
706
      tSerializeSClientHbBatchReq(buf, tlen, pReq);
wafwerar's avatar
wafwerar 已提交
707
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
708

L
Liu Jicong 已提交
709 710
      if (pInfo == NULL) {
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
D
dapan1121 已提交
711 712
        tFreeClientHbBatchReq(pReq);
        //hbClearReqInfo(pAppHbMgr);
wafwerar's avatar
wafwerar 已提交
713
        taosMemoryFree(buf);
L
Liu Jicong 已提交
714 715
        break;
      }
L
Liu Jicong 已提交
716
      pInfo->fp = hbAsyncCallBack;
L
Liu Jicong 已提交
717 718 719
      pInfo->msgInfo.pData = buf;
      pInfo->msgInfo.len = tlen;
      pInfo->msgType = TDMT_MND_HEARTBEAT;
D
dapan1121 已提交
720
      pInfo->param = strdup(pAppHbMgr->key);
L
Liu Jicong 已提交
721 722
      pInfo->requestId = generateRequestId();
      pInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
723 724

      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
L
fix  
Liu Jicong 已提交
725 726
      int64_t       transporterId = 0;
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
L
Liu Jicong 已提交
727
      asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
D
dapan1121 已提交
728 729
      tFreeClientHbBatchReq(pReq);
      //hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
730 731 732

      atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
    }
733

wafwerar's avatar
wafwerar 已提交
734
    taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
735

L
Liu Jicong 已提交
736
    taosMsleep(HEARTBEAT_INTERVAL);
L
Liu Jicong 已提交
737 738 739 740 741
  }
  return NULL;
}

static int32_t hbCreateThread() {
wafwerar's avatar
wafwerar 已提交
742 743 744
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
L
Liu Jicong 已提交
745

D
dapan1121 已提交
746 747 748
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
L
Liu Jicong 已提交
749
  }
D
dapan1121 已提交
750
  taosThreadAttrDestroy(&thAttr);
L
Liu Jicong 已提交
751 752 753
  return 0;
}

L
Liu Jicong 已提交
754
static void hbStopThread() {
D
dapan1121 已提交
755 756 757
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
    return;
  }
758
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
D
dapan1121 已提交
759
    tscDebug("hb thread already stopped");
760 761
    return;
  }
L
fix  
Liu Jicong 已提交
762

763
  while (2 != atomic_load_8(&clientHbMgr.threadStop)) {
wafwerar's avatar
wafwerar 已提交
764
    taosUsleep(10);
765
  }
D
dapan1121 已提交
766

L
fix  
Liu Jicong 已提交
767
  tscDebug("hb thread stopped");
L
Liu Jicong 已提交
768 769
}

L
fix  
Liu Jicong 已提交
770
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
L
Liu Jicong 已提交
771
  hbMgrInit();
wafwerar's avatar
wafwerar 已提交
772
  SAppHbMgr *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
L
Liu Jicong 已提交
773 774 775 776 777 778
  if (pAppHbMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  // init stat
  pAppHbMgr->startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
779 780 781
  pAppHbMgr->connKeyCnt = 0;
  pAppHbMgr->reportCnt = 0;
  pAppHbMgr->reportBytes = 0;
D
dapan1121 已提交
782
  pAppHbMgr->key = strdup(key);
L
Liu Jicong 已提交
783

L
Liu Jicong 已提交
784 785
  // init app info
  pAppHbMgr->pAppInstInfo = pAppInstInfo;
L
Liu Jicong 已提交
786 787 788

  // init hash info
  pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
789 790 791

  if (pAppHbMgr->activeInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
792
    taosMemoryFree(pAppHbMgr);
L
Liu Jicong 已提交
793 794
    return NULL;
  }
H
Haojun Liao 已提交
795

796
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
L
Liu Jicong 已提交
797

wafwerar's avatar
wafwerar 已提交
798
  taosThreadMutexLock(&clientHbMgr.lock);
L
Liu Jicong 已提交
799
  taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
wafwerar's avatar
wafwerar 已提交
800
  taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
801

L
Liu Jicong 已提交
802 803 804
  return pAppHbMgr;
}

D
dapan1121 已提交
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
  while (pIter != NULL) {
    SClientHbReq *pOneReq = pIter;
    tFreeClientHbReq(pOneReq);
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
  }
  taosHashCleanup(pTarget->activeInfo);
  pTarget->activeInfo = NULL;
  
  taosMemoryFree(pTarget->key);
  taosMemoryFree(pTarget);
}

void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
  taosThreadMutexLock(&clientHbMgr.lock);
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int32_t i = 0; i < mgrSize; ++i) {
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
    if (pItem == *pAppHbMgr) {
      hbFreeAppHbMgr(*pAppHbMgr);
      *pAppHbMgr = NULL;
      taosArrayRemove(clientHbMgr.appHbMgrs, i);
      break;
    }
  }
  taosThreadMutexUnlock(&clientHbMgr.lock);
}

D
dapan1121 已提交
834
void appHbMgrCleanup(void) {
L
Liu Jicong 已提交
835 836
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int i = 0; i < sz; i++) {
L
fix  
Liu Jicong 已提交
837
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
D
dapan1121 已提交
838
    hbFreeAppHbMgr(pTarget);
L
Liu Jicong 已提交
839 840 841 842
  }
}

int hbMgrInit() {
L
Liu Jicong 已提交
843 844 845 846
  // init once
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
  if (old == 1) return 0;

D
dapan1121 已提交
847 848 849 850
  clientHbMgr.appId = tGenIdPI64();
  tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId);
  
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
L
fix  
Liu Jicong 已提交
851
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
wafwerar's avatar
wafwerar 已提交
852
  taosThreadMutexInit(&clientHbMgr.lock, NULL);
L
Liu Jicong 已提交
853 854 855 856 857

  // init handle funcs
  hbMgrInitHandle();

  // init backgroud thread
D
dapan1121 已提交
858
  hbCreateThread();
L
Liu Jicong 已提交
859

L
Liu Jicong 已提交
860 861 862 863
  return 0;
}

void hbMgrCleanUp() {
D
dapan1121 已提交
864
  hbStopThread();
L
fix  
Liu Jicong 已提交
865

L
Liu Jicong 已提交
866
  // destroy all appHbMgr
L
Liu Jicong 已提交
867 868 869
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
  if (old == 0) return;

wafwerar's avatar
wafwerar 已提交
870
  taosThreadMutexLock(&clientHbMgr.lock);
D
dapan1121 已提交
871
  appHbMgrCleanup();
L
fix  
Liu Jicong 已提交
872
  taosArrayDestroy(clientHbMgr.appHbMgrs);
wafwerar's avatar
wafwerar 已提交
873
  taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
874

875
  clientHbMgr.appHbMgrs = NULL;
L
Liu Jicong 已提交
876 877
}

D
dapan1121 已提交
878
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
L
Liu Jicong 已提交
879
  // init hash in activeinfo
L
fix  
Liu Jicong 已提交
880
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
881 882 883
  if (data != NULL) {
    return 0;
  }
D
dapan1121 已提交
884
  SClientHbReq hbReq = {0};
L
Liu Jicong 已提交
885
  hbReq.connKey = connKey;
D
dapan1121 已提交
886
  hbReq.clusterId = clusterId;
D
dapan1121 已提交
887
  //hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
fix  
Liu Jicong 已提交
888

L
Liu Jicong 已提交
889
  taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
L
fix  
Liu Jicong 已提交
890

L
Liu Jicong 已提交
891
  atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
892 893 894
  return 0;
}

D
dapan1121 已提交
895
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) {
L
Liu Jicong 已提交
896
  SClientHbKey connKey = {
D
dapan1121 已提交
897
      .tscRid = tscRefId,
D
dapan1121 已提交
898
      .connType = connType,
L
Liu Jicong 已提交
899
  };
D
dapan1121 已提交
900

D
dapan1121 已提交
901 902
  switch (connType) {
    case CONN_TYPE__QUERY: {
D
dapan1121 已提交
903
      return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId);
D
dapan1121 已提交
904
    }
D
dapan1121 已提交
905
    case CONN_TYPE__TMQ: {
L
Liu Jicong 已提交
906
      return 0;
D
dapan1121 已提交
907 908
    }
    default:
L
Liu Jicong 已提交
909
      return 0;
D
dapan1121 已提交
910 911 912
  }
}

L
fix  
Liu Jicong 已提交
913
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
D
dapan1121 已提交
914 915
  SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  if (pReq) {
D
dapan1121 已提交
916
    tFreeClientHbReq(pReq);
D
dapan1121 已提交
917 918 919
    taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  }

D
dapan1121 已提交
920
  if (NULL == pReq) {
D
dapan1121 已提交
921 922
    return;
  }
L
Liu Jicong 已提交
923

L
Liu Jicong 已提交
924
  atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
925 926
}