clientHb.c 24.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

D
dapan1121 已提交
16
#include "catalog.h"
L
fix  
Liu Jicong 已提交
17
#include "clientInt.h"
D
dapan1121 已提交
18
#include "clientLog.h"
L
Liu Jicong 已提交
19
#include "scheduler.h"
L
fix  
Liu Jicong 已提交
20
#include "trpc.h"
L
Liu Jicong 已提交
21 22 23 24 25 26

static SClientHbMgr clientHbMgr = {0};

static int32_t hbCreateThread();
static void    hbStopThread();

L
Liu Jicong 已提交
27 28
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }

L
Liu Jicong 已提交
29
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
L
Liu Jicong 已提交
30

D
dapan 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

  SUserAuthBatchRsp batchRsp = {0};
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
    tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version);

    catalogUpdateUserAuthInfo(pCatalog, rsp);
  }

  tFreeSUserAuthBatchRsp(&batchRsp);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
52 53 54
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

S
Shengliang Guan 已提交
55 56 57 58 59
  SUseDbBatchRsp batchUseRsp = {0};
  if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
D
dapan1121 已提交
60

S
Shengliang Guan 已提交
61 62 63
  int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
L
fix  
Liu Jicong 已提交
64 65
    tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->uid);

D
dapan1121 已提交
66
    if (rsp->vgVersion < 0) {
D
dapan1121 已提交
67
      code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
D
dapan1121 已提交
68
    } else {
D
dapan1121 已提交
69
      SDBVgInfo vgInfo = {0};
D
dapan1121 已提交
70 71
      vgInfo.vgVersion = rsp->vgVersion;
      vgInfo.hashMethod = rsp->hashMethod;
D
dapan1121 已提交
72 73
      vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
      if (NULL == vgInfo.vgHash) {
D
dapan1121 已提交
74 75 76 77
        tscError("hash init[%d] failed", rsp->vgNum);
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
      }

S
Shengliang Guan 已提交
78 79 80
      for (int32_t j = 0; j < rsp->vgNum; ++j) {
        SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
        if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
D
dapan1121 已提交
81
          tscError("hash push failed, errno:%d", errno);
D
dapan1121 已提交
82
          taosHashCleanup(vgInfo.vgHash);
D
dapan1121 已提交
83 84
          return TSDB_CODE_TSC_OUT_OF_MEMORY;
        }
L
fix  
Liu Jicong 已提交
85 86
      }

D
dapan1121 已提交
87
      catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo);
D
dapan1121 已提交
88 89 90 91 92 93 94
    }

    if (code) {
      return code;
    }
  }

S
Shengliang Guan 已提交
95
  tFreeSUseDbBatchRsp(&batchUseRsp);
D
dapan1121 已提交
96 97 98
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
99 100 101
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

S
Shengliang Guan 已提交
102 103 104 105 106 107 108 109 110 111
  STableMetaBatchRsp batchMetaRsp = {0};
  if (tDeserializeSTableMetaBatchRsp(value, valueLen, &batchMetaRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t numOfBatchs = taosArrayGetSize(batchMetaRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    STableMetaRsp *rsp = taosArrayGet(batchMetaRsp.pArray, i);

D
dapan 已提交
112 113
    if (rsp->numOfColumns < 0) {
      tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
D
dapan1121 已提交
114
      catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
D
dapan 已提交
115
    } else {
D
dapan1121 已提交
116
      tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
S
Shengliang Guan 已提交
117
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
118
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
S
Shengliang Guan 已提交
119
        tFreeSTableMetaBatchRsp(&batchMetaRsp);
D
dapan1121 已提交
120
        return TSDB_CODE_TSC_INVALID_VALUE;
D
dapan 已提交
121 122
      }

D
dapan1121 已提交
123
      catalogUpdateSTableMeta(pCatalog, rsp);
D
dapan 已提交
124 125 126
    }
  }

S
Shengliang Guan 已提交
127
  tFreeSTableMetaBatchRsp(&batchMetaRsp);
D
dapan 已提交
128 129 130
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
131
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
L
fix  
Liu Jicong 已提交
132
  SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
D
dapan1121 已提交
133
  if (NULL == info) {
L
Liu Jicong 已提交
134 135
    tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
            pRsp->connKey.connType);
D
dapan1121 已提交
136 137 138
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
139 140 141 142 143 144 145
  if (pRsp->query) {
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
    if (NULL == pTscObj) {
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
    } else {
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
      pTscObj->connId = pRsp->query->connId;
L
Liu Jicong 已提交
146

D
dapan1121 已提交
147 148 149 150 151 152 153 154 155
      if (pRsp->query->killRid) {
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
        if (NULL == pRequest) {
          tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid);
        } else {
          taos_stop_query((TAOS_RES *)pRequest);
          releaseRequest(pRsp->query->killRid);
        }
      }
L
Liu Jicong 已提交
156

D
dapan1121 已提交
157 158 159 160 161 162 163
      if (pRsp->query->killConnection) {
        taos_close(pTscObj);
      }

      releaseTscObj(pRsp->connKey.tscRid);
    }
  }
L
Liu Jicong 已提交
164

D
dapan1121 已提交
165
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
D
dapan1121 已提交
166 167

  tscDebug("hb got %d rsp kv", kvNum);
L
fix  
Liu Jicong 已提交
168

D
dapan1121 已提交
169 170 171
  for (int32_t i = 0; i < kvNum; ++i) {
    SKv *kv = taosArrayGet(pRsp->info, i);
    switch (kv->key) {
D
dapan 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
      case HEARTBEAT_KEY_USER_AUTHINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        int64_t         *clusterId = (int64_t *)info->param;
        struct SCatalog *pCatalog = NULL;

        int32_t code = catalogGetHandle(*clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
          break;
        }

        hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog);
        break;
      }
D
dapan1121 已提交
190 191 192 193 194 195
      case HEARTBEAT_KEY_DBINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

L
fix  
Liu Jicong 已提交
196
        int64_t         *clusterId = (int64_t *)info->param;
D
dapan1121 已提交
197
        struct SCatalog *pCatalog = NULL;
L
fix  
Liu Jicong 已提交
198

D
dapan1121 已提交
199 200
        int32_t code = catalogGetHandle(*clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
L
fix  
Liu Jicong 已提交
201
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
D
dapan1121 已提交
202 203 204 205
          break;
        }

        hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
206
        break;
D
dapan1121 已提交
207
      }
L
fix  
Liu Jicong 已提交
208
      case HEARTBEAT_KEY_STBINFO: {
D
dapan 已提交
209 210 211 212
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }
D
dapan1121 已提交
213

L
fix  
Liu Jicong 已提交
214
        int64_t         *clusterId = (int64_t *)info->param;
D
dapan 已提交
215
        struct SCatalog *pCatalog = NULL;
L
fix  
Liu Jicong 已提交
216

D
dapan 已提交
217 218
        int32_t code = catalogGetHandle(*clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
L
fix  
Liu Jicong 已提交
219
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
D
dapan 已提交
220 221 222 223
          break;
        }

        hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
224
        break;
D
dapan 已提交
225
      }
D
dapan1121 已提交
226 227 228 229 230 231 232 233 234
      default:
        tscError("invalid hb key type:%d", kv->key);
        break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
235
static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
236
  static int32_t emptyRspNum = 0;
L
Liu Jicong 已提交
237
  if (code != 0) {
wafwerar's avatar
wafwerar 已提交
238
    taosMemoryFreeClear(param);
L
Liu Jicong 已提交
239 240
    return -1;
  }
S
Shengliang Guan 已提交
241

L
fix  
Liu Jicong 已提交
242
  char             *key = (char *)param;
D
dapan1121 已提交
243
  SClientHbBatchRsp pRsp = {0};
S
Shengliang Guan 已提交
244
  tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
L
fix  
Liu Jicong 已提交
245

D
dapan1121 已提交
246
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
D
dapan1121 已提交
247

L
fix  
Liu Jicong 已提交
248
  SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
D
dapan1121 已提交
249
  if (pInst == NULL || NULL == *pInst) {
L
fix  
Liu Jicong 已提交
250
    tscError("cluster not exist, key:%s", key);
wafwerar's avatar
wafwerar 已提交
251
    taosMemoryFreeClear(param);
D
dapan1121 已提交
252
    tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
253 254 255
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
256
  taosMemoryFreeClear(param);
D
dapan1121 已提交
257 258

  if (rspNum) {
L
fix  
Liu Jicong 已提交
259 260
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
D
dapan1121 已提交
261 262 263 264 265
  } else {
    atomic_add_fetch_32(&emptyRspNum, 1);
  }

  for (int32_t i = 0; i < rspNum; ++i) {
L
fix  
Liu Jicong 已提交
266
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
D
dapan1121 已提交
267
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])((*pInst)->pAppHbMgr, rsp);
D
dapan1121 已提交
268 269 270 271
    if (code) {
      break;
    }
  }
D
dapan1121 已提交
272 273

  tFreeClientHbBatchRsp(&pRsp);
L
fix  
Liu Jicong 已提交
274

D
dapan1121 已提交
275
  return code;
L
Liu Jicong 已提交
276 277
}

D
dapan1121 已提交
278
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
L
Liu Jicong 已提交
279
  int64_t    now = taosGetTimestampUs();
D
dapan1121 已提交
280
  SQueryDesc desc = {0};
L
Liu Jicong 已提交
281
  int32_t    code = 0;
D
dapan1121 已提交
282

L
Liu Jicong 已提交
283
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
D
dapan1121 已提交
284
  while (pIter != NULL) {
L
Liu Jicong 已提交
285
    int64_t     *rid = pIter;
D
dapan1121 已提交
286 287 288 289 290 291
    SRequestObj *pRequest = acquireRequest(*rid);
    if (NULL == pRequest) {
      continue;
    }

    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
L
Liu Jicong 已提交
292 293
    desc.stime = pRequest->metric.start;
    desc.queryId = pRequest->requestId;
D
dapan1121 已提交
294
    desc.useconds = now - pRequest->metric.start;
L
Liu Jicong 已提交
295 296
    desc.reqRid = pRequest->self;
    desc.pid = hbBasic->pid;
D
dapan1121 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
    taosGetFqdn(desc.fqdn);
    desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;

    if (desc.subPlanNum) {
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
      if (NULL == desc.subDesc) {
        releaseRequest(*rid);
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }

      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
      if (code) {
        taosArrayDestroy(desc.subDesc);
        desc.subDesc = NULL;
      }
    }

L
Liu Jicong 已提交
314
    releaseRequest(*rid);
D
dapan1121 已提交
315
    taosArrayPush(hbBasic->queryDesc, &desc);
L
Liu Jicong 已提交
316

D
dapan1121 已提交
317 318 319 320 321 322 323 324 325 326 327 328
    pIter = taosHashIterate(pObj->pRequests, pIter);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
  if (NULL == pTscObj) {
    tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
329

D
dapan1121 已提交
330 331 332 333 334 335
  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
  if (numOfQueries <= 0) {
    releaseTscObj(connKey->tscRid);
    tscDebug("no queries on connection");
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
336

D
dapan1121 已提交
337 338 339 340 341 342 343 344 345 346 347 348 349 350
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
  if (NULL == hbBasic) {
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
    releaseTscObj(connKey->tscRid);
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
  if (NULL == hbBasic->queryDesc) {
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
    releaseTscObj(connKey->tscRid);
    taosMemoryFree(hbBasic);
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
L
Liu Jicong 已提交
351

D
dapan1121 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
  hbBasic->connId = pTscObj->connId;
  hbBasic->pid = taosGetPId();
  taosGetAppName(hbBasic->app, NULL);

  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
  if (code) {
    releaseTscObj(connKey->tscRid);
    taosMemoryFree(hbBasic);
    return code;
  }

  req->query = hbBasic;
  releaseTscObj(connKey->tscRid);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SUserAuthVersion *users = NULL;
  uint32_t          userNum = 0;
  int32_t           code = 0;

  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (userNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < userNum; ++i) {
    SUserAuthVersion *user = &users[i];
    user->version = htonl(user->version);
  }

  SKv kv = {
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
      .valueLen = sizeof(SUserAuthVersion) * userNum,
      .value = users,
  };

  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);

  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
402 403
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SDbVgVersion *dbs = NULL;
L
fix  
Liu Jicong 已提交
404 405
  uint32_t      dbNum = 0;
  int32_t       code = 0;
D
dapan1121 已提交
406 407 408 409 410 411

  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
412 413 414 415
  if (dbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
416 417 418 419
  for (int32_t i = 0; i < dbNum; ++i) {
    SDbVgVersion *db = &dbs[i];
    db->dbId = htobe64(db->dbId);
    db->vgVersion = htonl(db->vgVersion);
D
dapan 已提交
420
    db->numOfTable = htonl(db->numOfTable);
D
dapan1121 已提交
421 422
  }

L
Liu Jicong 已提交
423 424 425 426 427
  SKv kv = {
      .key = HEARTBEAT_KEY_DBINFO,
      .valueLen = sizeof(SDbVgVersion) * dbNum,
      .value = dbs,
  };
D
dapan1121 已提交
428 429 430

  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);

D
dapan1121 已提交
431 432 433 434 435
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
436 437
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SSTableMetaVersion *stbs = NULL;
L
fix  
Liu Jicong 已提交
438 439
  uint32_t            stbNum = 0;
  int32_t             code = 0;
D
dapan 已提交
440 441 442 443 444 445 446 447 448 449 450 451 452 453

  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (stbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < stbNum; ++i) {
    SSTableMetaVersion *stb = &stbs[i];
    stb->suid = htobe64(stb->suid);
    stb->sversion = htons(stb->sversion);
L
fix  
Liu Jicong 已提交
454
    stb->tversion = htons(stb->tversion);
D
dapan 已提交
455 456
  }

L
Liu Jicong 已提交
457 458 459 460 461
  SKv kv = {
      .key = HEARTBEAT_KEY_STBINFO,
      .valueLen = sizeof(SSTableMetaVersion) * stbNum,
      .value = stbs,
  };
D
dapan 已提交
462 463 464 465 466 467 468 469

  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);

  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

L
fix  
Liu Jicong 已提交
470 471
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
  int64_t         *clusterId = (int64_t *)param;
D
dapan1121 已提交
472 473 474 475
  struct SCatalog *pCatalog = NULL;

  int32_t code = catalogGetHandle(*clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
L
fix  
Liu Jicong 已提交
476
    tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
D
dapan1121 已提交
477 478
    return code;
  }
L
fix  
Liu Jicong 已提交
479

D
dapan1121 已提交
480
  hbGetQueryBasicInfo(connKey, req);
L
Liu Jicong 已提交
481

D
dapan 已提交
482 483 484 485 486
  code = hbGetExpiredUserInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
487 488 489 490 491
  code = hbGetExpiredDBInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan 已提交
492 493 494 495 496
  code = hbGetExpiredStbInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
497 498 499 500
  return TSDB_CODE_SUCCESS;
}

void hbMgrInitMqHbHandle() {
D
dapan1121 已提交
501 502
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle;
L
Liu Jicong 已提交
503

D
dapan1121 已提交
504 505
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
L
Liu Jicong 已提交
506 507
}

L
Liu Jicong 已提交
508 509
static FORCE_INLINE void hbMgrInitHandle() {
  // init all handle
D
dapan1121 已提交
510
  hbMgrInitMqHbHandle();
L
Liu Jicong 已提交
511 512
}

D
dapan1121 已提交
513 514
void hbFreeReq(void *req) {
  SClientHbReq *pReq = (SClientHbReq *)req;
D
dapan1121 已提交
515
  tFreeReqKvHash(pReq->info);
D
dapan1121 已提交
516 517
}

D
dapan1121 已提交
518 519 520 521 522
void hbClearClientHbReq(SClientHbReq *pReq) {
  pReq->query = NULL;
  pReq->info = NULL;
}

L
fix  
Liu Jicong 已提交
523
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
wafwerar's avatar
wafwerar 已提交
524
  SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
L
Liu Jicong 已提交
525
  if (pBatchReq == NULL) {
L
Liu Jicong 已提交
526 527 528
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
529
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
L
Liu Jicong 已提交
530
  pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
L
Liu Jicong 已提交
531

D
dapan1121 已提交
532
  int32_t code = 0;
L
fix  
Liu Jicong 已提交
533
  void   *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
L
Liu Jicong 已提交
534
  while (pIter != NULL) {
L
fix  
Liu Jicong 已提交
535
    SClientHbReq *pOneReq = pIter;
D
dapan1121 已提交
536

L
fix  
Liu Jicong 已提交
537
    SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
D
dapan1121 已提交
538
    if (info) {
D
dapan1121 已提交
539
      code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq);
D
dapan1121 已提交
540
      if (code) {
D
dapan1121 已提交
541 542
        pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
        continue;
D
dapan1121 已提交
543 544 545
      }
    }

L
Liu Jicong 已提交
546
    taosArrayPush(pBatchReq->reqs, pOneReq);
D
dapan1121 已提交
547
    hbClearClientHbReq(pOneReq);
L
Liu Jicong 已提交
548

L
Liu Jicong 已提交
549
    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
L
Liu Jicong 已提交
550 551
  }

L
Liu Jicong 已提交
552 553 554 555
  //  if (code) {
  //    taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
  //    taosMemoryFreeClear(pBatchReq);
  //  }
L
Liu Jicong 已提交
556

L
Liu Jicong 已提交
557
  return pBatchReq;
L
Liu Jicong 已提交
558 559
}

D
dapan1121 已提交
560 561 562
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
  void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
  while (pIter != NULL) {
L
fix  
Liu Jicong 已提交
563
    SClientHbReq *pOneReq = pIter;
D
dapan1121 已提交
564

D
dapan1121 已提交
565
    tFreeReqKvHash(pOneReq->info);
D
dapan1121 已提交
566 567 568 569 570 571
    taosHashClear(pOneReq->info);

    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
  }
}

L
fix  
Liu Jicong 已提交
572
static void *hbThreadFunc(void *param) {
L
Liu Jicong 已提交
573 574
  setThreadName("hb");
  while (1) {
575
    int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2);
L
fix  
Liu Jicong 已提交
576
    if (1 == threadStop) {
L
Liu Jicong 已提交
577 578 579
      break;
    }

wafwerar's avatar
wafwerar 已提交
580
    taosThreadMutexLock(&clientHbMgr.lock);
581

L
Liu Jicong 已提交
582
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
L
fix  
Liu Jicong 已提交
583 584
    for (int i = 0; i < sz; i++) {
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
L
Liu Jicong 已提交
585

L
Liu Jicong 已提交
586 587 588 589
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
      if (connCnt == 0) {
        continue;
      }
L
fix  
Liu Jicong 已提交
590
      SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
L
Liu Jicong 已提交
591 592 593
      if (pReq == NULL) {
        continue;
      }
L
fix  
Liu Jicong 已提交
594
      int   tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
wafwerar's avatar
wafwerar 已提交
595
      void *buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
596
      if (buf == NULL) {
D
dapan1121 已提交
597 598 599
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
        tFreeClientHbBatchReq(pReq, false);
        hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
600 601
        break;
      }
L
fix  
Liu Jicong 已提交
602

S
Shengliang Guan 已提交
603
      tSerializeSClientHbBatchReq(buf, tlen, pReq);
wafwerar's avatar
wafwerar 已提交
604
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
605

L
Liu Jicong 已提交
606 607
      if (pInfo == NULL) {
        terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
L
Liu Jicong 已提交
608
        tFreeClientHbBatchReq(pReq, false);
D
dapan1121 已提交
609
        hbClearReqInfo(pAppHbMgr);
wafwerar's avatar
wafwerar 已提交
610
        taosMemoryFree(buf);
L
Liu Jicong 已提交
611 612
        break;
      }
L
Liu Jicong 已提交
613
      pInfo->fp = hbAsyncCallBack;
L
Liu Jicong 已提交
614 615 616
      pInfo->msgInfo.pData = buf;
      pInfo->msgInfo.len = tlen;
      pInfo->msgType = TDMT_MND_HEARTBEAT;
D
dapan1121 已提交
617
      pInfo->param = strdup(pAppHbMgr->key);
L
Liu Jicong 已提交
618 619
      pInfo->requestId = generateRequestId();
      pInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
620 621

      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
L
fix  
Liu Jicong 已提交
622 623
      int64_t       transporterId = 0;
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
L
Liu Jicong 已提交
624
      asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
L
fix  
Liu Jicong 已提交
625
      tFreeClientHbBatchReq(pReq, false);
D
dapan1121 已提交
626
      hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
627 628 629

      atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
    }
630

wafwerar's avatar
wafwerar 已提交
631
    taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
632

L
Liu Jicong 已提交
633
    taosMsleep(HEARTBEAT_INTERVAL);
L
Liu Jicong 已提交
634 635 636 637 638
  }
  return NULL;
}

static int32_t hbCreateThread() {
wafwerar's avatar
wafwerar 已提交
639 640 641
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
L
Liu Jicong 已提交
642

D
dapan1121 已提交
643 644 645
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
L
Liu Jicong 已提交
646
  }
D
dapan1121 已提交
647
  taosThreadAttrDestroy(&thAttr);
L
Liu Jicong 已提交
648 649 650
  return 0;
}

L
Liu Jicong 已提交
651
static void hbStopThread() {
652
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
D
dapan1121 已提交
653
    tscDebug("hb thread already stopped");
654 655
    return;
  }
L
fix  
Liu Jicong 已提交
656

657
  while (2 != atomic_load_8(&clientHbMgr.threadStop)) {
wafwerar's avatar
wafwerar 已提交
658
    taosUsleep(10);
659
  }
D
dapan1121 已提交
660

L
fix  
Liu Jicong 已提交
661
  tscDebug("hb thread stopped");
L
Liu Jicong 已提交
662 663
}

L
fix  
Liu Jicong 已提交
664
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
L
Liu Jicong 已提交
665
  hbMgrInit();
wafwerar's avatar
wafwerar 已提交
666
  SAppHbMgr *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
L
Liu Jicong 已提交
667 668 669 670 671 672
  if (pAppHbMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  // init stat
  pAppHbMgr->startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
673 674 675
  pAppHbMgr->connKeyCnt = 0;
  pAppHbMgr->reportCnt = 0;
  pAppHbMgr->reportBytes = 0;
D
dapan1121 已提交
676
  pAppHbMgr->key = strdup(key);
L
Liu Jicong 已提交
677

L
Liu Jicong 已提交
678 679
  // init app info
  pAppHbMgr->pAppInstInfo = pAppInstInfo;
L
Liu Jicong 已提交
680 681 682

  // init hash info
  pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
683 684 685

  if (pAppHbMgr->activeInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
686
    taosMemoryFree(pAppHbMgr);
L
Liu Jicong 已提交
687 688
    return NULL;
  }
H
Haojun Liao 已提交
689 690

  taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
L
Liu Jicong 已提交
691
  // init getInfoFunc
D
dapan1121 已提交
692
  pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
693

D
dapan1121 已提交
694
  if (pAppHbMgr->connInfo == NULL) {
L
Liu Jicong 已提交
695
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
696
    taosMemoryFree(pAppHbMgr);
L
Liu Jicong 已提交
697 698 699
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
700
  taosThreadMutexLock(&clientHbMgr.lock);
L
Liu Jicong 已提交
701
  taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
wafwerar's avatar
wafwerar 已提交
702
  taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
703

L
Liu Jicong 已提交
704 705 706
  return pAppHbMgr;
}

D
dapan1121 已提交
707
void appHbMgrCleanup(void) {
L
Liu Jicong 已提交
708 709
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int i = 0; i < sz; i++) {
L
fix  
Liu Jicong 已提交
710
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
L
Liu Jicong 已提交
711 712

    void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
D
dapan1121 已提交
713 714 715 716 717
    while (pIter != NULL) {
      SClientHbReq *pOneReq = pIter;
      hbFreeReq(pOneReq);
      taosHashCleanup(pOneReq->info);
      pIter = taosHashIterate(pTarget->activeInfo, pIter);
L
Liu Jicong 已提交
718
    }
D
dapan1121 已提交
719 720
    taosHashCleanup(pTarget->activeInfo);
    pTarget->activeInfo = NULL;
D
dapan1121 已提交
721 722 723 724 725 726

    pIter = taosHashIterate(pTarget->connInfo, NULL);
    while (pIter != NULL) {
      SHbConnInfo *info = pIter;
      taosMemoryFree(info->param);
      pIter = taosHashIterate(pTarget->connInfo, pIter);
L
Liu Jicong 已提交
727
    }
D
dapan1121 已提交
728 729
    taosHashCleanup(pTarget->connInfo);
    pTarget->connInfo = NULL;
D
dapan1121 已提交
730 731 732

    taosMemoryFree(pTarget->key);
    taosMemoryFree(pTarget);
L
Liu Jicong 已提交
733 734 735 736
  }
}

int hbMgrInit() {
L
Liu Jicong 已提交
737 738 739 740
  // init once
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
  if (old == 1) return 0;

L
fix  
Liu Jicong 已提交
741
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
wafwerar's avatar
wafwerar 已提交
742
  taosThreadMutexInit(&clientHbMgr.lock, NULL);
L
Liu Jicong 已提交
743 744 745 746 747

  // init handle funcs
  hbMgrInitHandle();

  // init backgroud thread
L
Liu Jicong 已提交
748
  /*hbCreateThread();*/
L
Liu Jicong 已提交
749

L
Liu Jicong 已提交
750 751 752 753
  return 0;
}

void hbMgrCleanUp() {
L
Liu Jicong 已提交
754
  // hbStopThread();
L
fix  
Liu Jicong 已提交
755

L
Liu Jicong 已提交
756
  // destroy all appHbMgr
L
Liu Jicong 已提交
757 758 759
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
  if (old == 0) return;

wafwerar's avatar
wafwerar 已提交
760
  taosThreadMutexLock(&clientHbMgr.lock);
D
dapan1121 已提交
761
  appHbMgrCleanup();
L
fix  
Liu Jicong 已提交
762
  taosArrayDestroy(clientHbMgr.appHbMgrs);
wafwerar's avatar
wafwerar 已提交
763
  taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
764

765
  clientHbMgr.appHbMgrs = NULL;
L
Liu Jicong 已提交
766 767
}

L
fix  
Liu Jicong 已提交
768
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
L
Liu Jicong 已提交
769
  // init hash in activeinfo
L
fix  
Liu Jicong 已提交
770
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
771 772 773
  if (data != NULL) {
    return 0;
  }
D
dapan1121 已提交
774
  SClientHbReq hbReq = {0};
L
Liu Jicong 已提交
775 776
  hbReq.connKey = connKey;
  hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
fix  
Liu Jicong 已提交
777

L
Liu Jicong 已提交
778
  taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
L
fix  
Liu Jicong 已提交
779

L
Liu Jicong 已提交
780
  // init hash
D
dapan1121 已提交
781
  if (info != NULL) {
L
fix  
Liu Jicong 已提交
782
    SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
D
dapan1121 已提交
783 784
    info->req = pReq;
    taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
L
Liu Jicong 已提交
785 786
  }

L
Liu Jicong 已提交
787
  atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
788 789 790
  return 0;
}

D
dapan1121 已提交
791
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) {
L
Liu Jicong 已提交
792
  SClientHbKey connKey = {
D
dapan1121 已提交
793
      .tscRid = tscRefId,
D
dapan1121 已提交
794
      .connType = connType,
L
Liu Jicong 已提交
795 796
  };
  SHbConnInfo info = {0};
D
dapan1121 已提交
797

D
dapan1121 已提交
798 799
  switch (connType) {
    case CONN_TYPE__QUERY: {
wafwerar's avatar
wafwerar 已提交
800
      int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t));
D
dapan1121 已提交
801 802 803
      *pClusterId = clusterId;

      info.param = pClusterId;
L
Liu Jicong 已提交
804
      return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
D
dapan1121 已提交
805
    }
D
dapan1121 已提交
806
    case CONN_TYPE__TMQ: {
L
Liu Jicong 已提交
807
      return 0;
D
dapan1121 已提交
808 809
    }
    default:
L
Liu Jicong 已提交
810
      return 0;
D
dapan1121 已提交
811 812 813
  }
}

L
fix  
Liu Jicong 已提交
814
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
D
dapan1121 已提交
815 816 817 818 819 820 821 822 823 824 825 826
  SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  if (pReq) {
    hbFreeReq(pReq);
    taosHashCleanup(pReq->info);
    taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
  }

  SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
  if (info) {
    taosMemoryFree(info->param);
    taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
  }
L
Liu Jicong 已提交
827

D
dapan1121 已提交
828
  if (NULL == pReq || NULL == info) {
D
dapan1121 已提交
829 830
    return;
  }
L
Liu Jicong 已提交
831

L
Liu Jicong 已提交
832
  atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
833 834
}

L
fix  
Liu Jicong 已提交
835 836
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
                  int32_t valueLen) {
L
Liu Jicong 已提交
837
  // find req by connection id
L
fix  
Liu Jicong 已提交
838
  SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
839 840 841 842 843 844
  ASSERT(pReq != NULL);

  taosHashPut(pReq->info, key, keyLen, value, valueLen);

  return 0;
}