clientHb.c 26.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

D
dapan1121 已提交
16
#include "catalog.h"
L
fix  
Liu Jicong 已提交
17
#include "clientInt.h"
D
dapan1121 已提交
18
#include "clientLog.h"
L
Liu Jicong 已提交
19
#include "scheduler.h"
L
fix  
Liu Jicong 已提交
20
#include "trpc.h"
L
Liu Jicong 已提交
21 22 23 24 25 26

static SClientHbMgr clientHbMgr = {0};

static int32_t hbCreateThread();
static void    hbStopThread();

L
Liu Jicong 已提交
27 28
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }

L
Liu Jicong 已提交
29
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
L
Liu Jicong 已提交
30

D
dapan 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

  SUserAuthBatchRsp batchRsp = {0};
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
    tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version);

    catalogUpdateUserAuthInfo(pCatalog, rsp);
  }

D
dapan1121 已提交
48
  taosArrayDestroy(batchRsp.pArray);
D
dapan 已提交
49 50 51
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
52 53 54
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

S
Shengliang Guan 已提交
55 56 57 58 59
  SUseDbBatchRsp batchUseRsp = {0};
  if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
D
dapan1121 已提交
60

S
Shengliang Guan 已提交
61 62 63
  int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
L
fix  
Liu Jicong 已提交
64 65
    tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->uid);

D
dapan1121 已提交
66
    if (rsp->vgVersion < 0) {
D
dapan1121 已提交
67
      code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
D
dapan1121 已提交
68
    } else {
D
dapan1121 已提交
69 70 71 72 73 74 75 76 77 78
      SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
      if (NULL == vgInfo) {
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
      }
      
      vgInfo->vgVersion = rsp->vgVersion;
      vgInfo->hashMethod = rsp->hashMethod;
      vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
      if (NULL == vgInfo->vgHash) {
        taosMemoryFree(vgInfo);
D
dapan1121 已提交
79 80 81 82
        tscError("hash init[%d] failed", rsp->vgNum);
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
      }

S
Shengliang Guan 已提交
83 84
      for (int32_t j = 0; j < rsp->vgNum; ++j) {
        SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
D
dapan1121 已提交
85
        if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
D
dapan1121 已提交
86
          tscError("hash push failed, errno:%d", errno);
D
dapan1121 已提交
87 88
          taosHashCleanup(vgInfo->vgHash);
          taosMemoryFree(vgInfo);
D
dapan1121 已提交
89 90
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
L
fix  
Liu Jicong 已提交
91 92
      }

D
dapan1121 已提交
93
      catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
D
dapan1121 已提交
94 95 96 97 98 99 100
    }

    if (code) {
      return code;
    }
  }

S
Shengliang Guan 已提交
101
  tFreeSUseDbBatchRsp(&batchUseRsp);
D
dapan1121 已提交
102 103 104
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
105 106 107
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

D
dapan1121 已提交
108 109
  SSTbHbRsp hbRsp = {0};
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
S
Shengliang Guan 已提交
110 111 112 113
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

D
dapan1121 已提交
114 115 116
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
  for (int32_t i = 0; i < numOfMeta; ++i) {
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
S
Shengliang Guan 已提交
117

D
dapan 已提交
118 119
    if (rsp->numOfColumns < 0) {
      tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
D
dapan1121 已提交
120
      catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
D
dapan 已提交
121
    } else {
D
dapan1121 已提交
122
      tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
S
Shengliang Guan 已提交
123
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
124
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
D
dapan1121 已提交
125
        tFreeSSTbHbRsp(&hbRsp);
D
dapan1121 已提交
126
        return TSDB_CODE_TSC_INVALID_VALUE;
D
dapan 已提交
127 128
      }

D
dapan1121 已提交
129
      catalogUpdateTableMeta(pCatalog, rsp);
D
dapan 已提交
130 131 132
    }
  }

D
dapan1121 已提交
133 134 135 136 137 138 139 140 141 142
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
  for (int32_t i = 0; i < numOfIndex; ++i) {
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);

    catalogUpdateTableIndex(pCatalog, rsp);
  }

  taosArrayDestroy(hbRsp.pIndexRsp);
  hbRsp.pIndexRsp = NULL;

D
dapan1121 已提交
143
  tFreeSSTbHbRsp(&hbRsp);
D
dapan 已提交
144 145 146
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
147
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
D
dapan1121 已提交
148 149 150
  SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
  if (NULL == pReq) {
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
L
Liu Jicong 已提交
151
            pRsp->connKey.connType);
D
dapan1121 已提交
152 153 154
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
155 156 157 158
  if (pRsp->query) {
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
    if (NULL == pTscObj) {
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
D
dapan1121 已提交
159
    } else {      
D
dapan1121 已提交
160
      if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) {
D
dapan1121 已提交
161 162 163 164 165 166 167
        SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
        SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
        SEp* pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
        tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", 
            pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port, 
            pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn, pNewEp->port);
            
D
dapan1121 已提交
168 169
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
      }
D
dapan1121 已提交
170 171 172
      
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
D
dapan1121 已提交
173
      pTscObj->connId = pRsp->query->connId;
L
Liu Jicong 已提交
174

D
dapan1121 已提交
175
      if (pRsp->query->killRid) {
D
dapan1121 已提交
176
        tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
D
dapan1121 已提交
177 178 179 180 181 182 183 184
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
        if (NULL == pRequest) {
          tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid);
        } else {
          taos_stop_query((TAOS_RES *)pRequest);
          releaseRequest(pRsp->query->killRid);
        }
      }
L
Liu Jicong 已提交
185

D
dapan1121 已提交
186
      if (pRsp->query->killConnection) {
187
        taos_close_internal(pTscObj);
D
dapan1121 已提交
188 189
      }

D
dapan1121 已提交
190 191 192 193
      if (pRsp->query->pQnodeList) {
        updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList);
      }

D
dapan1121 已提交
194 195 196
      releaseTscObj(pRsp->connKey.tscRid);
    }
  }
L
Liu Jicong 已提交
197

D
dapan1121 已提交
198
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
D
dapan1121 已提交
199 200

  tscDebug("hb got %d rsp kv", kvNum);
L
fix  
Liu Jicong 已提交
201

D
dapan1121 已提交
202 203 204
  for (int32_t i = 0; i < kvNum; ++i) {
    SKv *kv = taosArrayGet(pRsp->info, i);
    switch (kv->key) {
D
dapan 已提交
205 206 207 208 209 210 211 212
      case HEARTBEAT_KEY_USER_AUTHINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        struct SCatalog *pCatalog = NULL;

D
dapan1121 已提交
213
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan 已提交
214
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
215
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan 已提交
216 217 218 219 220 221
          break;
        }

        hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog);
        break;
      }
D
dapan1121 已提交
222 223 224 225 226 227 228
      case HEARTBEAT_KEY_DBINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        struct SCatalog *pCatalog = NULL;
L
fix  
Liu Jicong 已提交
229

D
dapan1121 已提交
230
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan1121 已提交
231
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
232
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan1121 已提交
233 234 235 236
          break;
        }

        hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
237
        break;
D
dapan1121 已提交
238
      }
L
fix  
Liu Jicong 已提交
239
      case HEARTBEAT_KEY_STBINFO: {
D
dapan 已提交
240 241 242 243
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }
D
dapan1121 已提交
244

D
dapan 已提交
245
        struct SCatalog *pCatalog = NULL;
L
fix  
Liu Jicong 已提交
246

D
dapan1121 已提交
247
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan 已提交
248
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
249
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan 已提交
250 251 252 253
          break;
        }

        hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
254
        break;
D
dapan 已提交
255
      }
D
dapan1121 已提交
256 257 258 259 260 261 262 263 264
      default:
        tscError("invalid hb key type:%d", kv->key);
        break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
265
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
266
  static int32_t emptyRspNum = 0;
L
fix  
Liu Jicong 已提交
267
  char             *key = (char *)param;
D
dapan1121 已提交
268
  SClientHbBatchRsp pRsp = {0};
D
dapan1121 已提交
269 270 271 272
  if (TSDB_CODE_SUCCESS == code) {
    tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
  }
  
D
dapan1121 已提交
273
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
D
dapan1121 已提交
274

D
dapan1121 已提交
275 276
  taosThreadMutexLock(&appInfo.mutex);

L
fix  
Liu Jicong 已提交
277
  SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
D
dapan1121 已提交
278
  if (pInst == NULL || NULL == *pInst) {
D
dapan1121 已提交
279
    taosThreadMutexUnlock(&appInfo.mutex);
L
fix  
Liu Jicong 已提交
280
    tscError("cluster not exist, key:%s", key);
wafwerar's avatar
wafwerar 已提交
281
    taosMemoryFreeClear(param);
D
dapan1121 已提交
282
    tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
283 284 285
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
286
  taosMemoryFreeClear(param);
D
dapan1121 已提交
287

D
dapan1121 已提交
288 289 290 291
  if (code != 0) {
    (*pInst)->onlineDnodes = 0;
  }

D
dapan1121 已提交
292
  if (rspNum) {
L
fix  
Liu Jicong 已提交
293 294
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
D
dapan1121 已提交
295 296 297 298 299
  } else {
    atomic_add_fetch_32(&emptyRspNum, 1);
  }

  for (int32_t i = 0; i < rspNum; ++i) {
L
fix  
Liu Jicong 已提交
300
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
D
dapan1121 已提交
301
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])((*pInst)->pAppHbMgr, rsp);
D
dapan1121 已提交
302 303 304 305
    if (code) {
      break;
    }
  }
D
dapan1121 已提交
306

D
dapan1121 已提交
307 308
  taosThreadMutexUnlock(&appInfo.mutex);

D
dapan1121 已提交
309
  tFreeClientHbBatchRsp(&pRsp);
L
fix  
Liu Jicong 已提交
310

D
dapan1121 已提交
311
  return code;
L
Liu Jicong 已提交
312 313
}

D
dapan1121 已提交
314
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
L
Liu Jicong 已提交
315
  int64_t    now = taosGetTimestampUs();
D
dapan1121 已提交
316
  SQueryDesc desc = {0};
L
Liu Jicong 已提交
317
  int32_t    code = 0;
D
dapan1121 已提交
318

L
Liu Jicong 已提交
319
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
D
dapan1121 已提交
320
  while (pIter != NULL) {
L
Liu Jicong 已提交
321
    int64_t     *rid = pIter;
D
dapan1121 已提交
322
    SRequestObj *pRequest = acquireRequest(*rid);
D
dapan1121 已提交
323
    if (NULL == pRequest || pRequest->killed) {
D
dapan1121 已提交
324
      pIter = taosHashIterate(pObj->pRequests, pIter);
D
dapan1121 已提交
325 326 327 328
      continue;
    }

    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
329
    desc.stime = pRequest->metric.start / 1000;
L
Liu Jicong 已提交
330
    desc.queryId = pRequest->requestId;
D
dapan1121 已提交
331
    desc.useconds = now - pRequest->metric.start;
L
Liu Jicong 已提交
332
    desc.reqRid = pRequest->self;
333
    desc.stableQuery = pRequest->stableQuery;
D
dapan1121 已提交
334
    taosGetFqdn(desc.fqdn);
D
dapan1121 已提交
335
    desc.subPlanNum = pRequest->body.subplanNum;
D
dapan1121 已提交
336 337 338 339 340 341 342 343 344 345 346 347

    if (desc.subPlanNum) {
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
      if (NULL == desc.subDesc) {
        releaseRequest(*rid);
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }

      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
      if (code) {
        taosArrayDestroy(desc.subDesc);
        desc.subDesc = NULL;
348
        desc.subPlanNum = 0;
D
dapan1121 已提交
349
      }
D
dapan1121 已提交
350 351
    } else {
      desc.subDesc = NULL;
D
dapan1121 已提交
352 353
    }

L
Liu Jicong 已提交
354
    releaseRequest(*rid);
D
dapan1121 已提交
355
    taosArrayPush(hbBasic->queryDesc, &desc);
L
Liu Jicong 已提交
356

D
dapan1121 已提交
357 358 359 360 361 362 363 364 365 366 367 368
    pIter = taosHashIterate(pObj->pRequests, pIter);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
  if (NULL == pTscObj) {
    tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
369

D
dapan1121 已提交
370 371 372 373 374 375
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
  if (NULL == hbBasic) {
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
    releaseTscObj(connKey->tscRid);
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
376 377 378 379 380 381 382 383 384 385
  
  hbBasic->connId = pTscObj->connId;

  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
  if (numOfQueries <= 0) {
    req->query = hbBasic;
    releaseTscObj(connKey->tscRid);
    tscDebug("no queries on connection");
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
386 387 388 389 390 391 392 393

  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
  if (NULL == hbBasic->queryDesc) {
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
    releaseTscObj(connKey->tscRid);
    taosMemoryFree(hbBasic);
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
L
Liu Jicong 已提交
394

D
dapan1121 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408

  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
  if (code) {
    releaseTscObj(connKey->tscRid);
    taosMemoryFree(hbBasic);
    return code;
  }

  req->query = hbBasic;
  releaseTscObj(connKey->tscRid);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SUserAuthVersion *users = NULL;
  uint32_t          userNum = 0;
  int32_t           code = 0;

  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (userNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < userNum; ++i) {
    SUserAuthVersion *user = &users[i];
    user->version = htonl(user->version);
  }

  SKv kv = {
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
      .valueLen = sizeof(SUserAuthVersion) * userNum,
      .value = users,
  };

  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);

D
dapan1121 已提交
436 437 438 439
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }
  
D
dapan 已提交
440 441 442 443 444 445
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
446 447
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SDbVgVersion *dbs = NULL;
L
fix  
Liu Jicong 已提交
448 449
  uint32_t      dbNum = 0;
  int32_t       code = 0;
D
dapan1121 已提交
450 451 452 453 454 455

  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
456 457 458 459
  if (dbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
460 461 462 463
  for (int32_t i = 0; i < dbNum; ++i) {
    SDbVgVersion *db = &dbs[i];
    db->dbId = htobe64(db->dbId);
    db->vgVersion = htonl(db->vgVersion);
D
dapan 已提交
464
    db->numOfTable = htonl(db->numOfTable);
D
dapan1121 已提交
465 466
  }

L
Liu Jicong 已提交
467 468 469 470 471
  SKv kv = {
      .key = HEARTBEAT_KEY_DBINFO,
      .valueLen = sizeof(SDbVgVersion) * dbNum,
      .value = dbs,
  };
D
dapan1121 已提交
472 473 474

  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);

D
dapan1121 已提交
475 476 477 478
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }

D
dapan1121 已提交
479 480 481 482 483
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
484
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
D
dapan1121 已提交
485
  SSTableVersion *stbs = NULL;
L
fix  
Liu Jicong 已提交
486 487
  uint32_t            stbNum = 0;
  int32_t             code = 0;
D
dapan 已提交
488 489 490 491 492 493 494 495 496 497 498

  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (stbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < stbNum; ++i) {
D
dapan1121 已提交
499
    SSTableVersion *stb = &stbs[i];
D
dapan 已提交
500 501
    stb->suid = htobe64(stb->suid);
    stb->sversion = htons(stb->sversion);
L
fix  
Liu Jicong 已提交
502
    stb->tversion = htons(stb->tversion);
D
dapan1121 已提交
503
    stb->smaVer = htonl(stb->smaVer);
D
dapan 已提交
504 505
  }

L
Liu Jicong 已提交
506 507
  SKv kv = {
      .key = HEARTBEAT_KEY_STBINFO,
D
dapan1121 已提交
508
      .valueLen = sizeof(SSTableVersion) * stbNum,
L
Liu Jicong 已提交
509 510
      .value = stbs,
  };
D
dapan 已提交
511 512 513

  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);

D
dapan1121 已提交
514 515 516 517
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }

D
dapan 已提交
518 519 520 521 522
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
  SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
  if (NULL != pApp) {
    memcpy(&req->app, pApp, sizeof(*pApp));
  } else {
    memset(&req->app.summary, 0, sizeof(req->app.summary));
    req->app.pid = taosGetPId();
    req->app.appId = clientHbMgr.appId;
    taosGetAppName(req->app.name, NULL);    
  }

  return TSDB_CODE_SUCCESS;
}


L
fix  
Liu Jicong 已提交
538 539
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
  int64_t         *clusterId = (int64_t *)param;
D
dapan1121 已提交
540 541 542 543
  struct SCatalog *pCatalog = NULL;

  int32_t code = catalogGetHandle(*clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
L
fix  
Liu Jicong 已提交
544
    tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
D
dapan1121 已提交
545 546
    return code;
  }
L
fix  
Liu Jicong 已提交
547

D
dapan1121 已提交
548 549
  hbGetAppInfo(*clusterId, req);

D
dapan1121 已提交
550
  hbGetQueryBasicInfo(connKey, req);
L
Liu Jicong 已提交
551

D
dapan 已提交
552 553 554 555 556
  code = hbGetExpiredUserInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
557 558 559 560 561
  code = hbGetExpiredDBInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan 已提交
562 563 564 565 566
  code = hbGetExpiredStbInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
567 568 569 570
  return TSDB_CODE_SUCCESS;
}

void hbMgrInitMqHbHandle() {
D
dapan1121 已提交
571 572
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle;
L
Liu Jicong 已提交
573

D
dapan1121 已提交
574 575
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
L
Liu Jicong 已提交
576 577
}

L
Liu Jicong 已提交
578 579
static FORCE_INLINE void hbMgrInitHandle() {
  // init all handle
D
dapan1121 已提交
580
  hbMgrInitMqHbHandle();
L
Liu Jicong 已提交
581 582
}

L
fix  
Liu Jicong 已提交
583
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
wafwerar's avatar
wafwerar 已提交
584
  SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
L
Liu Jicong 已提交
585
  if (pBatchReq == NULL) {
L
Liu Jicong 已提交
586 587 588
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
589
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
L
Liu Jicong 已提交
590
  pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
L
Liu Jicong 已提交
591

D
dapan1121 已提交
592
  int32_t code = 0;
L
fix  
Liu Jicong 已提交
593
  void   *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
L
Liu Jicong 已提交
594
  while (pIter != NULL) {
L
fix  
Liu Jicong 已提交
595
    SClientHbReq *pOneReq = pIter;
D
dapan1121 已提交
596

D
dapan1121 已提交
597 598
    pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);

D
dapan1121 已提交
599 600 601 602
    code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq);
    if (code) {
      pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
      continue;
D
dapan1121 已提交
603 604
    }

D
dapan1121 已提交
605
    //hbClearClientHbReq(pOneReq);
L
Liu Jicong 已提交
606

L
Liu Jicong 已提交
607
    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
L
Liu Jicong 已提交
608 609
  }

L
Liu Jicong 已提交
610 611 612 613
  //  if (code) {
  //    taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
  //    taosMemoryFreeClear(pBatchReq);
  //  }
L
Liu Jicong 已提交
614

L
Liu Jicong 已提交
615
  return pBatchReq;
L
Liu Jicong 已提交
616 617
}

wafwerar's avatar
wafwerar 已提交
618 619 620 621
void hbThreadFuncUnexpectedStopped(void) {
  atomic_store_8(&clientHbMgr.threadStop, 2);
}

D
dapan1121 已提交
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) {
  dst->numOfInsertsReq += src->numOfInsertsReq;
  dst->numOfInsertRows += src->numOfInsertRows;
  dst->insertElapsedTime += src->insertElapsedTime;
  dst->insertBytes += src->insertBytes;
  dst->fetchBytes += src->fetchBytes;
  dst->queryElapsedTime += src->queryElapsedTime;
  dst->numOfSlowQueries += src->numOfSlowQueries;
  dst->totalRequests += src->totalRequests;
  dst->currentRequests += src->currentRequests;
}

int32_t hbGatherAppInfo(void) {
  SAppHbReq req = {0};
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  if (sz > 0) {
    req.pid = taosGetPId();
    req.appId = clientHbMgr.appId;
    taosGetAppName(req.name, NULL);
  }
D
dapan1121 已提交
642 643

  taosHashClear(clientHbMgr.appSummary);
D
dapan1121 已提交
644 645 646 647 648 649 650
  
  for (int32_t i = 0; i < sz; ++i) {
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
    uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
    SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
    if (NULL == pApp) {
      memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
D
dapan1121 已提交
651
      req.startTime = pAppHbMgr->startTime;
D
dapan1121 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665
      taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req));
    } else {
      if (pAppHbMgr->startTime < pApp->startTime) {
        pApp->startTime = pAppHbMgr->startTime;
      }
      
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
    }
  }

  return TSDB_CODE_SUCCESS;
}


L
fix  
Liu Jicong 已提交
666
static void *hbThreadFunc(void *param) {
L
Liu Jicong 已提交
667
  setThreadName("hb");
wafwerar's avatar
wafwerar 已提交
668
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
669 670 671
  if (taosCheckCurrentInDll()) {
    atexit(hbThreadFuncUnexpectedStopped);
  }
wafwerar's avatar
wafwerar 已提交
672
#endif
L
Liu Jicong 已提交
673
  while (1) {
674
    if (1 == clientHbMgr.threadStop) {
L
Liu Jicong 已提交
675 676 677
      break;
    }

wafwerar's avatar
wafwerar 已提交
678
    taosThreadMutexLock(&clientHbMgr.lock);
679

L
Liu Jicong 已提交
680
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
D
dapan1121 已提交
681 682 683 684
    if (sz > 0) {
      hbGatherAppInfo();
    }
    
L
fix  
Liu Jicong 已提交
685 686
    for (int i = 0; i < sz; i++) {
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
L
Liu Jicong 已提交
687

L
Liu Jicong 已提交
688 689 690 691
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
      if (connCnt == 0) {
        continue;
      }
L
fix  
Liu Jicong 已提交
692
      SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
L
Liu Jicong 已提交
693 694 695
      if (pReq == NULL) {
        continue;
      }
L
fix  
Liu Jicong 已提交
696
      int   tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
wafwerar's avatar
wafwerar 已提交
697
      void *buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
698
      if (buf == NULL) {
D
dapan1121 已提交
699
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
D
dapan1121 已提交
700 701
        tFreeClientHbBatchReq(pReq);
        //hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
702 703
        break;
      }
L
fix  
Liu Jicong 已提交
704

S
Shengliang Guan 已提交
705
      tSerializeSClientHbBatchReq(buf, tlen, pReq);
wafwerar's avatar
wafwerar 已提交
706
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
707

L
Liu Jicong 已提交
708 709
      if (pInfo == NULL) {
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
D
dapan1121 已提交
710 711
        tFreeClientHbBatchReq(pReq);
        //hbClearReqInfo(pAppHbMgr);
wafwerar's avatar
wafwerar 已提交
712
        taosMemoryFree(buf);
L
Liu Jicong 已提交
713 714
        break;
      }
L
Liu Jicong 已提交
715
      pInfo->fp = hbAsyncCallBack;
L
Liu Jicong 已提交
716 717 718
      pInfo->msgInfo.pData = buf;
      pInfo->msgInfo.len = tlen;
      pInfo->msgType = TDMT_MND_HEARTBEAT;
D
dapan1121 已提交
719
      pInfo->param = strdup(pAppHbMgr->key);
L
Liu Jicong 已提交
720 721
      pInfo->requestId = generateRequestId();
      pInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
722 723

      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
L
fix  
Liu Jicong 已提交
724 725
      int64_t       transporterId = 0;
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
L
Liu Jicong 已提交
726
      asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
D
dapan1121 已提交
727 728
      tFreeClientHbBatchReq(pReq);
      //hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
729 730 731

      atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
    }
732

wafwerar's avatar
wafwerar 已提交
733
    taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
734

L
Liu Jicong 已提交
735
    taosMsleep(HEARTBEAT_INTERVAL);
L
Liu Jicong 已提交
736 737 738 739 740
  }
  return NULL;
}

static int32_t hbCreateThread() {
wafwerar's avatar
wafwerar 已提交
741 742 743
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
L
Liu Jicong 已提交
744

D
dapan1121 已提交
745 746 747
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
L
Liu Jicong 已提交
748
  }
D
dapan1121 已提交
749
  taosThreadAttrDestroy(&thAttr);
L
Liu Jicong 已提交
750 751 752
  return 0;
}

L
Liu Jicong 已提交
753
static void hbStopThread() {
D
dapan1121 已提交
754 755 756
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
    return;
  }
757
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
D
dapan1121 已提交
758
    tscDebug("hb thread already stopped");
759 760
    return;
  }
L
fix  
Liu Jicong 已提交
761

762
  taosThreadJoin(clientHbMgr.thread, NULL);    
D
dapan1121 已提交
763

L
fix  
Liu Jicong 已提交
764
  tscDebug("hb thread stopped");
L
Liu Jicong 已提交
765 766
}

L
fix  
Liu Jicong 已提交
767
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
L
Liu Jicong 已提交
768
  hbMgrInit();
wafwerar's avatar
wafwerar 已提交
769
  SAppHbMgr *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
L
Liu Jicong 已提交
770 771 772 773 774 775
  if (pAppHbMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  // init stat
  pAppHbMgr->startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
776 777 778
  pAppHbMgr->connKeyCnt = 0;
  pAppHbMgr->reportCnt = 0;
  pAppHbMgr->reportBytes = 0;
D
dapan1121 已提交
779
  pAppHbMgr->key = strdup(key);
L
Liu Jicong 已提交
780

L
Liu Jicong 已提交
781 782
  // init app info
  pAppHbMgr->pAppInstInfo = pAppInstInfo;
L
Liu Jicong 已提交
783 784 785

  // init hash info
  pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
786 787 788

  if (pAppHbMgr->activeInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
789
    taosMemoryFree(pAppHbMgr);
L
Liu Jicong 已提交
790 791
    return NULL;
  }
H
Haojun Liao 已提交
792

793
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
L
Liu Jicong 已提交
794

wafwerar's avatar
wafwerar 已提交
795
  taosThreadMutexLock(&clientHbMgr.lock);
L
Liu Jicong 已提交
796
  taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
wafwerar's avatar
wafwerar 已提交
797
  taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
798

L
Liu Jicong 已提交
799 800 801
  return pAppHbMgr;
}

D
dapan1121 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
  while (pIter != NULL) {
    SClientHbReq *pOneReq = pIter;
    tFreeClientHbReq(pOneReq);
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
  }
  taosHashCleanup(pTarget->activeInfo);
  pTarget->activeInfo = NULL;
  
  taosMemoryFree(pTarget->key);
  taosMemoryFree(pTarget);
}

void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
  taosThreadMutexLock(&clientHbMgr.lock);
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int32_t i = 0; i < mgrSize; ++i) {
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
    if (pItem == *pAppHbMgr) {
      hbFreeAppHbMgr(*pAppHbMgr);
      *pAppHbMgr = NULL;
      taosArrayRemove(clientHbMgr.appHbMgrs, i);
      break;
    }
  }
  taosThreadMutexUnlock(&clientHbMgr.lock);
}

D
dapan1121 已提交
831
void appHbMgrCleanup(void) {
L
Liu Jicong 已提交
832 833
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int i = 0; i < sz; i++) {
L
fix  
Liu Jicong 已提交
834
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
D
dapan1121 已提交
835
    hbFreeAppHbMgr(pTarget);
L
Liu Jicong 已提交
836 837 838 839
  }
}

int hbMgrInit() {
L
Liu Jicong 已提交
840 841 842 843
  // init once
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
  if (old == 1) return 0;

D
dapan1121 已提交
844 845 846 847
  clientHbMgr.appId = tGenIdPI64();
  tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId);
  
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
L
fix  
Liu Jicong 已提交
848
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
wafwerar's avatar
wafwerar 已提交
849
  taosThreadMutexInit(&clientHbMgr.lock, NULL);
L
Liu Jicong 已提交
850 851 852 853 854

  // init handle funcs
  hbMgrInitHandle();

  // init backgroud thread
D
dapan1121 已提交
855
  hbCreateThread();
L
Liu Jicong 已提交
856

L
Liu Jicong 已提交
857 858 859 860
  return 0;
}

void hbMgrCleanUp() {
D
dapan1121 已提交
861
  hbStopThread();
L
fix  
Liu Jicong 已提交
862

L
Liu Jicong 已提交
863
  // destroy all appHbMgr
L
Liu Jicong 已提交
864 865 866
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
  if (old == 0) return;

wafwerar's avatar
wafwerar 已提交
867
  taosThreadMutexLock(&clientHbMgr.lock);
D
dapan1121 已提交
868
  appHbMgrCleanup();
L
fix  
Liu Jicong 已提交
869
  taosArrayDestroy(clientHbMgr.appHbMgrs);
wafwerar's avatar
wafwerar 已提交
870
  taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
871

872
  clientHbMgr.appHbMgrs = NULL;
L
Liu Jicong 已提交
873 874
}

D
dapan1121 已提交
875
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
L
Liu Jicong 已提交
876
  // init hash in activeinfo
L
fix  
Liu Jicong 已提交
877
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
878 879 880
  if (data != NULL) {
    return 0;
  }
D
dapan1121 已提交
881
  SClientHbReq hbReq = {0};
L
Liu Jicong 已提交
882
  hbReq.connKey = connKey;
D
dapan1121 已提交
883
  hbReq.clusterId = clusterId;
D
dapan1121 已提交
884
  //hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
fix  
Liu Jicong 已提交
885

L
Liu Jicong 已提交
886
  taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
L
fix  
Liu Jicong 已提交
887

L
Liu Jicong 已提交
888
  atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
889 890 891
  return 0;
}

D
dapan1121 已提交
892
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) {
L
Liu Jicong 已提交
893
  SClientHbKey connKey = {
D
dapan1121 已提交
894
      .tscRid = tscRefId,
D
dapan1121 已提交
895
      .connType = connType,
L
Liu Jicong 已提交
896
  };
D
dapan1121 已提交
897

D
dapan1121 已提交
898 899
  switch (connType) {
    case CONN_TYPE__QUERY: {
D
dapan1121 已提交
900
      return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId);
D
dapan1121 已提交
901
    }
D
dapan1121 已提交
902
    case CONN_TYPE__TMQ: {
L
Liu Jicong 已提交
903
      return 0;
D
dapan1121 已提交
904 905
    }
    default:
L
Liu Jicong 已提交
906
      return 0;
D
dapan1121 已提交
907 908 909
  }
}

L
fix  
Liu Jicong 已提交
910
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
D
dapan1121 已提交
911 912
  SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  if (pReq) {
D
dapan1121 已提交
913
    tFreeClientHbReq(pReq);
D
dapan1121 已提交
914 915 916
    taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  }

D
dapan1121 已提交
917
  if (NULL == pReq) {
D
dapan1121 已提交
918 919
    return;
  }
L
Liu Jicong 已提交
920

L
Liu Jicong 已提交
921
  atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
922 923
}