clientHb.c 30.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

D
dapan1121 已提交
16
#include "catalog.h"
L
fix  
Liu Jicong 已提交
17
#include "clientInt.h"
D
dapan1121 已提交
18
#include "clientLog.h"
L
Liu Jicong 已提交
19
#include "scheduler.h"
L
fix  
Liu Jicong 已提交
20
#include "trpc.h"
L
Liu Jicong 已提交
21 22 23 24 25 26

static SClientHbMgr clientHbMgr = {0};

static int32_t hbCreateThread();
static void    hbStopThread();

L
Liu Jicong 已提交
27 28
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }

L
Liu Jicong 已提交
29
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
L
Liu Jicong 已提交
30

D
dapan 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

  SUserAuthBatchRsp batchRsp = {0};
  if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
  for (int32_t i = 0; i < numOfBatchs; ++i) {
    SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
    tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version);

    catalogUpdateUserAuthInfo(pCatalog, rsp);
  }

D
dapan1121 已提交
48
  taosArrayDestroy(batchRsp.pArray);
D
dapan 已提交
49 50 51
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
  int32_t code = 0;
  SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
  if (NULL == vgInfo) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    return code;
  }
  
  vgInfo->vgVersion = rsp->vgVersion;
  vgInfo->stateTs = rsp->stateTs;
  vgInfo->hashMethod = rsp->hashMethod;
  vgInfo->hashPrefix = rsp->hashPrefix;
  vgInfo->hashSuffix = rsp->hashSuffix;
  vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (NULL == vgInfo->vgHash) {
    taosMemoryFree(vgInfo);
    tscError("hash init[%d] failed", rsp->vgNum);
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _return;
  }
  
  for (int32_t j = 0; j < rsp->vgNum; ++j) {
    SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
    if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
      tscError("hash push failed, errno:%d", errno);
      taosHashCleanup(vgInfo->vgHash);
      taosMemoryFree(vgInfo);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _return;
    }
  }

_return:
  if (code) {
    taosHashCleanup(vgInfo->vgHash);
    taosMemoryFreeClear(vgInfo);
  }

  *pInfo = vgInfo;
  return code;
}

D
dapan1121 已提交
94 95 96
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

D
dapan1121 已提交
97 98
  SDbHbBatchRsp batchRsp = {0};
  if (tDeserializeSDbHbBatchRsp(value, valueLen, &batchRsp) != 0) {
S
Shengliang Guan 已提交
99 100 101
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
D
dapan1121 已提交
102

D
dapan1121 已提交
103
  int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
S
Shengliang Guan 已提交
104
  for (int32_t i = 0; i < numOfBatchs; ++i) {
D
dapan1121 已提交
105 106 107 108 109 110 111 112 113 114
    SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
    if (rsp->useDbRsp) {
      tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, 
        rsp->useDbRsp->db, rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
        
      if (rsp->useDbRsp->vgVersion < 0) {
        code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
      } else {
        SDBVgInfo *vgInfo = NULL;
        code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
D
dapan1121 已提交
115
        if (TSDB_CODE_SUCCESS != code) {
D
dapan1121 已提交
116
          goto _return;
D
dapan1121 已提交
117
        }
D
dapan1121 已提交
118 119 120 121 122 123 124 125 126 127 128
      
        catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo);
      
        if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
          code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
          if (TSDB_CODE_SUCCESS != code) {
            goto _return;
          }
      
          catalogUpdateDBVgInfo(pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->useDbRsp->uid, vgInfo);
        }
D
dapan1121 已提交
129
      }
D
dapan1121 已提交
130 131
    }

D
dapan1121 已提交
132 133 134 135
    if (rsp->cfgRsp) {
      tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
      catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
      rsp->cfgRsp = NULL;
D
dapan1121 已提交
136 137 138
    }
  }

D
dapan1121 已提交
139 140
_return:

D
dapan1121 已提交
141
  tFreeSDbHbBatchRsp(&batchRsp);
D
dapan1121 已提交
142
  return code;
D
dapan1121 已提交
143 144
}

D
dapan 已提交
145 146 147
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
  int32_t code = 0;

D
dapan1121 已提交
148 149
  SSTbHbRsp hbRsp = {0};
  if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
S
Shengliang Guan 已提交
150 151 152 153
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

D
dapan1121 已提交
154 155 156
  int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
  for (int32_t i = 0; i < numOfMeta; ++i) {
    STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
S
Shengliang Guan 已提交
157

D
dapan 已提交
158 159
    if (rsp->numOfColumns < 0) {
      tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
D
dapan1121 已提交
160
      catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
D
dapan 已提交
161
    } else {
D
dapan1121 已提交
162
      tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
S
Shengliang Guan 已提交
163
      if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
164
        tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
D
dapan1121 已提交
165
        tFreeSSTbHbRsp(&hbRsp);
D
dapan1121 已提交
166
        return TSDB_CODE_TSC_INVALID_VALUE;
D
dapan 已提交
167 168
      }

D
dapan1121 已提交
169
      catalogUpdateTableMeta(pCatalog, rsp);
D
dapan 已提交
170 171 172
    }
  }

D
dapan1121 已提交
173 174 175 176 177 178 179 180 181 182
  int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
  for (int32_t i = 0; i < numOfIndex; ++i) {
    STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);

    catalogUpdateTableIndex(pCatalog, rsp);
  }

  taosArrayDestroy(hbRsp.pIndexRsp);
  hbRsp.pIndexRsp = NULL;

D
dapan1121 已提交
183
  tFreeSSTbHbRsp(&hbRsp);
D
dapan 已提交
184 185 186
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
187
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
D
dapan1121 已提交
188
  SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
D
dapan1121 已提交
189 190
  if (NULL == pReq) {
    tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
L
Liu Jicong 已提交
191
            pRsp->connKey.connType);
D
dapan1121 已提交
192 193 194
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
195 196 197 198
  if (pRsp->query) {
    STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
    if (NULL == pTscObj) {
      tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
199
    } else {
D
dapan1121 已提交
200
      if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) {
201 202 203 204 205 206 207
        SEpSet *pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
        SEp    *pOrigEp = &pOrig->eps[pOrig->inUse];
        SEp    *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
        tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
                 pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
                 pNewEp->port);

D
dapan1121 已提交
208 209
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
      }
210

D
dapan1121 已提交
211 212
      pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
      pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
D
dapan1121 已提交
213
      pTscObj->connId = pRsp->query->connId;
wmmhello's avatar
wmmhello 已提交
214
      tscTrace("conn %u hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
dengyihao's avatar
dengyihao 已提交
215
               pTscObj->pAppInfo->totalDnodes);
L
Liu Jicong 已提交
216

D
dapan1121 已提交
217
      if (pRsp->query->killRid) {
D
dapan1121 已提交
218
        tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
D
dapan1121 已提交
219 220 221 222 223 224 225 226
        SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
        if (NULL == pRequest) {
          tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid);
        } else {
          taos_stop_query((TAOS_RES *)pRequest);
          releaseRequest(pRsp->query->killRid);
        }
      }
L
Liu Jicong 已提交
227

D
dapan1121 已提交
228
      if (pRsp->query->killConnection) {
229
        taos_close_internal(pTscObj);
D
dapan1121 已提交
230 231
      }

D
dapan1121 已提交
232 233 234 235
      if (pRsp->query->pQnodeList) {
        updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList);
      }

D
dapan1121 已提交
236 237 238
      releaseTscObj(pRsp->connKey.tscRid);
    }
  }
L
Liu Jicong 已提交
239

D
dapan1121 已提交
240
  int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
D
dapan1121 已提交
241 242

  tscDebug("hb got %d rsp kv", kvNum);
L
fix  
Liu Jicong 已提交
243

D
dapan1121 已提交
244 245 246
  for (int32_t i = 0; i < kvNum; ++i) {
    SKv *kv = taosArrayGet(pRsp->info, i);
    switch (kv->key) {
D
dapan 已提交
247 248 249 250 251 252 253 254
      case HEARTBEAT_KEY_USER_AUTHINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        struct SCatalog *pCatalog = NULL;

D
dapan1121 已提交
255
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan 已提交
256
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
257
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan 已提交
258 259 260 261 262 263
          break;
        }

        hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog);
        break;
      }
D
dapan1121 已提交
264 265 266 267 268 269 270
      case HEARTBEAT_KEY_DBINFO: {
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }

        struct SCatalog *pCatalog = NULL;
L
fix  
Liu Jicong 已提交
271

D
dapan1121 已提交
272
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan1121 已提交
273
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
274
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan1121 已提交
275 276 277 278
          break;
        }

        hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
279
        break;
D
dapan1121 已提交
280
      }
L
fix  
Liu Jicong 已提交
281
      case HEARTBEAT_KEY_STBINFO: {
D
dapan 已提交
282 283 284 285
        if (kv->valueLen <= 0 || NULL == kv->value) {
          tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
          break;
        }
D
dapan1121 已提交
286

D
dapan 已提交
287
        struct SCatalog *pCatalog = NULL;
L
fix  
Liu Jicong 已提交
288

D
dapan1121 已提交
289
        int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
D
dapan 已提交
290
        if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
291
          tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
D
dapan 已提交
292 293 294 295
          break;
        }

        hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
D
dapan1121 已提交
296
        break;
D
dapan 已提交
297
      }
D
dapan1121 已提交
298 299 300 301 302 303
      default:
        tscError("invalid hb key type:%d", kv->key);
        break;
    }
  }

D
dapan1121 已提交
304 305
  taosHashRelease(pAppHbMgr->activeInfo, pReq);

D
dapan1121 已提交
306 307 308
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
309
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
D
dapan1121 已提交
310 311 312 313
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
    goto _return;
  }

314
  static int32_t    emptyRspNum = 0;
D
dapan1121 已提交
315
  int32_t idx = *(int32_t *)param;
D
dapan1121 已提交
316
  SClientHbBatchRsp pRsp = {0};
D
dapan1121 已提交
317 318
  if (TSDB_CODE_SUCCESS == code) {
    tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
319

dengyihao's avatar
dengyihao 已提交
320 321 322 323 324 325
    int32_t now = taosGetTimestampSec();
    int32_t delta = abs(now - pRsp.svrTimestamp);
    if (delta > timestampDeltaLimit) {
      code = TSDB_CODE_TIME_UNSYNCED;
      tscError("time diff: %ds is too big", delta);
    }
dengyihao's avatar
dengyihao 已提交
326 327
  }

D
dapan1121 已提交
328
  int32_t rspNum = taosArrayGetSize(pRsp.rsps);
D
dapan1121 已提交
329

D
dapan1121 已提交
330
  taosThreadMutexLock(&clientHbMgr.lock);
D
dapan1121 已提交
331

D
dapan1121 已提交
332 333 334 335
  SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
  if (pAppHbMgr == NULL) {
    taosThreadMutexUnlock(&clientHbMgr.lock);
    tscError("appHbMgr not exist, idx:%d", idx);
D
dapan1121 已提交
336
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
337
    taosMemoryFree(pMsg->pEpSet);
D
dapan1121 已提交
338
    tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
339 340 341
    return -1;
  }

D
dapan1121 已提交
342 343
  SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;

D
dapan1121 已提交
344
  if (code != 0) {
D
dapan1121 已提交
345 346 347
    pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
    tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes,
             pInst->totalDnodes);
D
dapan1121 已提交
348 349
  }

D
dapan1121 已提交
350
  if (rspNum) {
L
fix  
Liu Jicong 已提交
351 352
    tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
             atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
D
dapan1121 已提交
353 354 355 356 357
  } else {
    atomic_add_fetch_32(&emptyRspNum, 1);
  }

  for (int32_t i = 0; i < rspNum; ++i) {
L
fix  
Liu Jicong 已提交
358
    SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
D
dapan1121 已提交
359
    code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
D
dapan1121 已提交
360 361 362 363
    if (code) {
      break;
    }
  }
D
dapan1121 已提交
364

D
dapan1121 已提交
365
  taosThreadMutexUnlock(&clientHbMgr.lock);
D
dapan1121 已提交
366

D
dapan1121 已提交
367
  tFreeClientHbBatchRsp(&pRsp);
D
dapan1121 已提交
368 369

_return:
H
Haojun Liao 已提交
370
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
371
  taosMemoryFree(pMsg->pEpSet);
D
dapan1121 已提交
372
  return code;
L
Liu Jicong 已提交
373 374
}

D
dapan1121 已提交
375
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
L
Liu Jicong 已提交
376
  int64_t    now = taosGetTimestampUs();
D
dapan1121 已提交
377
  SQueryDesc desc = {0};
L
Liu Jicong 已提交
378
  int32_t    code = 0;
D
dapan1121 已提交
379

L
Liu Jicong 已提交
380
  void *pIter = taosHashIterate(pObj->pRequests, NULL);
D
dapan1121 已提交
381
  while (pIter != NULL) {
L
Liu Jicong 已提交
382
    int64_t     *rid = pIter;
D
dapan1121 已提交
383
    SRequestObj *pRequest = acquireRequest(*rid);
D
dapan1121 已提交
384 385 386 387 388
    if (NULL == pRequest) {
      pIter = taosHashIterate(pObj->pRequests, pIter);
      continue;
    }

389
    if (pRequest->killed || 0 == pRequest->body.queryJob) {
D
dapan1121 已提交
390
      releaseRequest(*rid);
D
dapan1121 已提交
391
      pIter = taosHashIterate(pObj->pRequests, pIter);
D
dapan1121 已提交
392 393 394 395
      continue;
    }

    tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
396
    desc.stime = pRequest->metric.start / 1000;
L
Liu Jicong 已提交
397
    desc.queryId = pRequest->requestId;
D
dapan1121 已提交
398
    desc.useconds = now - pRequest->metric.start;
L
Liu Jicong 已提交
399
    desc.reqRid = pRequest->self;
400
    desc.stableQuery = pRequest->stableQuery;
D
dapan1121 已提交
401
    taosGetFqdn(desc.fqdn);
D
dapan1121 已提交
402
    desc.subPlanNum = pRequest->body.subplanNum;
D
dapan1121 已提交
403 404 405 406 407

    if (desc.subPlanNum) {
      desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
      if (NULL == desc.subDesc) {
        releaseRequest(*rid);
S
Shengliang Guan 已提交
408
        return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
409 410 411 412 413 414
      }

      code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
      if (code) {
        taosArrayDestroy(desc.subDesc);
        desc.subDesc = NULL;
415
        desc.subPlanNum = 0;
D
dapan1121 已提交
416
      }
D
dapan1121 已提交
417
      desc.subPlanNum = taosArrayGetSize(desc.subDesc);
D
dapan1121 已提交
418 419
    } else {
      desc.subDesc = NULL;
D
dapan1121 已提交
420 421
    }

L
Liu Jicong 已提交
422
    releaseRequest(*rid);
D
dapan1121 已提交
423
    taosArrayPush(hbBasic->queryDesc, &desc);
L
Liu Jicong 已提交
424

D
dapan1121 已提交
425 426 427 428 429 430 431 432 433 434
    pIter = taosHashIterate(pObj->pRequests, pIter);
  }

  return TSDB_CODE_SUCCESS;
}

int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
  STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
  if (NULL == pTscObj) {
    tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
S
Shengliang Guan 已提交
435
    return TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
436
  }
L
Liu Jicong 已提交
437

D
dapan1121 已提交
438 439 440 441
  SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
  if (NULL == hbBasic) {
    tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
    releaseTscObj(connKey->tscRid);
S
Shengliang Guan 已提交
442
    return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
443
  }
444

445 446 447 448 449 450 451 452 453
  hbBasic->connId = pTscObj->connId;

  int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
  if (numOfQueries <= 0) {
    req->query = hbBasic;
    releaseTscObj(connKey->tscRid);
    tscDebug("no queries on connection");
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
454 455 456 457 458 459

  hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
  if (NULL == hbBasic->queryDesc) {
    tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
    releaseTscObj(connKey->tscRid);
    taosMemoryFree(hbBasic);
S
Shengliang Guan 已提交
460
    return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
461
  }
L
Liu Jicong 已提交
462

D
dapan1121 已提交
463 464 465
  int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
  if (code) {
    releaseTscObj(connKey->tscRid);
D
dapan1121 已提交
466 467 468
    if (hbBasic->queryDesc) {
      taosArrayDestroyEx(hbBasic->queryDesc, tFreeClientHbQueryDesc);
    }
D
dapan1121 已提交
469 470 471 472 473 474 475 476 477 478
    taosMemoryFree(hbBasic);
    return code;
  }

  req->query = hbBasic;
  releaseTscObj(connKey->tscRid);

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
479 480 481 482 483 484 485 486 487 488 489
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
  SUserAuthVersion *users = NULL;
  uint32_t          userNum = 0;
  int32_t           code = 0;

  code = catalogGetExpiredUsers(pCatalog, &users, &userNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (userNum <= 0) {
D
dapan1121 已提交
490
    taosMemoryFree(users);
D
dapan 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < userNum; ++i) {
    SUserAuthVersion *user = &users[i];
    user->version = htonl(user->version);
  }

  SKv kv = {
      .key = HEARTBEAT_KEY_USER_AUTHINFO,
      .valueLen = sizeof(SUserAuthVersion) * userNum,
      .value = users,
  };

  tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);

D
dapan1121 已提交
507 508 509
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }
510

D
dapan 已提交
511 512 513 514 515
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
516
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
D
dapan1121 已提交
517
  SDbCacheInfo *dbs = NULL;
L
fix  
Liu Jicong 已提交
518 519
  uint32_t      dbNum = 0;
  int32_t       code = 0;
D
dapan1121 已提交
520 521 522 523 524 525

  code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
526
  if (dbNum <= 0) {
D
dapan1121 已提交
527
    taosMemoryFree(dbs);
D
dapan1121 已提交
528 529 530
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
531
  for (int32_t i = 0; i < dbNum; ++i) {
D
dapan1121 已提交
532
    SDbCacheInfo *db = &dbs[i];
D
dapan1121 已提交
533 534
    tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64, 
      i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
D
dapan1121 已提交
535

D
dapan1121 已提交
536 537
    db->dbId = htobe64(db->dbId);
    db->vgVersion = htonl(db->vgVersion);
D
dapan1121 已提交
538
    db->cfgVersion = htonl(db->cfgVersion);
D
dapan 已提交
539
    db->numOfTable = htonl(db->numOfTable);
D
dapan1121 已提交
540
    db->stateTs = htobe64(db->stateTs);
D
dapan1121 已提交
541 542
  }

L
Liu Jicong 已提交
543 544
  SKv kv = {
      .key = HEARTBEAT_KEY_DBINFO,
D
dapan1121 已提交
545
      .valueLen = sizeof(SDbCacheInfo) * dbNum,
L
Liu Jicong 已提交
546 547
      .value = dbs,
  };
D
dapan1121 已提交
548 549 550

  tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);

D
dapan1121 已提交
551 552 553 554
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }

D
dapan1121 已提交
555 556 557 558 559
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
560
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
D
dapan1121 已提交
561
  SSTableVersion *stbs = NULL;
562 563
  uint32_t        stbNum = 0;
  int32_t         code = 0;
D
dapan 已提交
564 565 566 567 568 569 570

  code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

  if (stbNum <= 0) {
D
dapan1121 已提交
571
    taosMemoryFree(stbs);
D
dapan 已提交
572 573 574 575
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < stbNum; ++i) {
D
dapan1121 已提交
576
    SSTableVersion *stb = &stbs[i];
D
dapan 已提交
577 578
    stb->suid = htobe64(stb->suid);
    stb->sversion = htons(stb->sversion);
L
fix  
Liu Jicong 已提交
579
    stb->tversion = htons(stb->tversion);
D
dapan1121 已提交
580
    stb->smaVer = htonl(stb->smaVer);
D
dapan 已提交
581 582
  }

L
Liu Jicong 已提交
583 584
  SKv kv = {
      .key = HEARTBEAT_KEY_STBINFO,
D
dapan1121 已提交
585
      .valueLen = sizeof(SSTableVersion) * stbNum,
L
Liu Jicong 已提交
586 587
      .value = stbs,
  };
D
dapan 已提交
588 589 590

  tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);

D
dapan1121 已提交
591 592 593 594
  if (NULL == req->info) {
    req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
  }

D
dapan 已提交
595 596 597 598 599
  taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
600
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
601
  SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
D
dapan1121 已提交
602 603 604 605 606 607
  if (NULL != pApp) {
    memcpy(&req->app, pApp, sizeof(*pApp));
  } else {
    memset(&req->app.summary, 0, sizeof(req->app.summary));
    req->app.pid = taosGetPId();
    req->app.appId = clientHbMgr.appId;
608
    taosGetAppName(req->app.name, NULL);
D
dapan1121 已提交
609 610 611 612 613
  }

  return TSDB_CODE_SUCCESS;
}

L
fix  
Liu Jicong 已提交
614 615
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
  int64_t         *clusterId = (int64_t *)param;
D
dapan1121 已提交
616 617 618 619
  struct SCatalog *pCatalog = NULL;

  int32_t code = catalogGetHandle(*clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
L
fix  
Liu Jicong 已提交
620
    tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
D
dapan1121 已提交
621 622
    return code;
  }
L
fix  
Liu Jicong 已提交
623

D
dapan1121 已提交
624 625
  hbGetAppInfo(*clusterId, req);

D
dapan1121 已提交
626
  hbGetQueryBasicInfo(connKey, req);
L
Liu Jicong 已提交
627

D
dapan 已提交
628 629 630 631 632
  code = hbGetExpiredUserInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
633 634 635 636 637
  code = hbGetExpiredDBInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan 已提交
638 639 640 641 642
  code = hbGetExpiredStbInfo(connKey, pCatalog, req);
  if (TSDB_CODE_SUCCESS != code) {
    return code;
  }

D
dapan1121 已提交
643 644 645
  return TSDB_CODE_SUCCESS;
}

646 647
static FORCE_INLINE void hbMgrInitHandle() {
  // init all handle
D
dapan1121 已提交
648 649
  clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
  clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle;
L
Liu Jicong 已提交
650

D
dapan1121 已提交
651 652
  clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
  clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
L
Liu Jicong 已提交
653 654
}

L
fix  
Liu Jicong 已提交
655
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
wafwerar's avatar
wafwerar 已提交
656
  SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
L
Liu Jicong 已提交
657
  if (pBatchReq == NULL) {
S
Shengliang Guan 已提交
658
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
659 660
    return NULL;
  }
L
Liu Jicong 已提交
661
  int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
L
Liu Jicong 已提交
662
  pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
L
Liu Jicong 已提交
663

dengyihao's avatar
dengyihao 已提交
664
  int64_t rid = -1;
D
dapan1121 已提交
665 666
  int32_t code = 0;

dengyihao's avatar
dengyihao 已提交
667 668 669 670 671 672 673 674 675 676 677
  void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);

  SClientHbReq *pOneReq = pIter;
  SClientHbKey *connKey = pOneReq ? &pOneReq->connKey : NULL;
  if (connKey != NULL) rid = connKey->tscRid;

  STscObj *pTscObj = (STscObj *)acquireTscObj(rid);
  if (pTscObj == NULL) {
    tFreeClientHbBatchReq(pBatchReq);
    return NULL;
  }
D
dapan1121 已提交
678

dengyihao's avatar
dengyihao 已提交
679 680
  while (pIter != NULL) {
    pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
D
dapan1121 已提交
681 682 683
    code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq);
    if (code) {
      pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
dengyihao's avatar
dengyihao 已提交
684
      pOneReq = pIter;
D
dapan1121 已提交
685
      continue;
D
dapan1121 已提交
686 687
    }

L
Liu Jicong 已提交
688
    pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
dengyihao's avatar
dengyihao 已提交
689
    pOneReq = pIter;
L
Liu Jicong 已提交
690
  }
dengyihao's avatar
dengyihao 已提交
691
  releaseTscObj(rid);
L
Liu Jicong 已提交
692

L
Liu Jicong 已提交
693
  return pBatchReq;
L
Liu Jicong 已提交
694 695
}

696
void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
wafwerar's avatar
wafwerar 已提交
697

698
void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
D
dapan1121 已提交
699 700 701 702 703 704 705 706 707 708 709 710 711
  dst->numOfInsertsReq += src->numOfInsertsReq;
  dst->numOfInsertRows += src->numOfInsertRows;
  dst->insertElapsedTime += src->insertElapsedTime;
  dst->insertBytes += src->insertBytes;
  dst->fetchBytes += src->fetchBytes;
  dst->queryElapsedTime += src->queryElapsedTime;
  dst->numOfSlowQueries += src->numOfSlowQueries;
  dst->totalRequests += src->totalRequests;
  dst->currentRequests += src->currentRequests;
}

int32_t hbGatherAppInfo(void) {
  SAppHbReq req = {0};
712
  int       sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
D
dapan1121 已提交
713 714 715 716 717
  if (sz > 0) {
    req.pid = taosGetPId();
    req.appId = clientHbMgr.appId;
    taosGetAppName(req.name, NULL);
  }
D
dapan1121 已提交
718 719

  taosHashClear(clientHbMgr.appSummary);
720

D
dapan1121 已提交
721 722
  for (int32_t i = 0; i < sz; ++i) {
    SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
dengyihao's avatar
dengyihao 已提交
723 724
    if (pAppHbMgr == NULL) continue;

725 726
    uint64_t   clusterId = pAppHbMgr->pAppInstInfo->clusterId;
    SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
D
dapan1121 已提交
727 728
    if (NULL == pApp) {
      memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
D
dapan1121 已提交
729
      req.startTime = pAppHbMgr->startTime;
D
dapan1121 已提交
730 731 732 733 734
      taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req));
    } else {
      if (pAppHbMgr->startTime < pApp->startTime) {
        pApp->startTime = pAppHbMgr->startTime;
      }
735

D
dapan1121 已提交
736 737 738 739 740 741 742
      hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
fix  
Liu Jicong 已提交
743
static void *hbThreadFunc(void *param) {
L
Liu Jicong 已提交
744
  setThreadName("hb");
wafwerar's avatar
wafwerar 已提交
745
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
746 747 748
  if (taosCheckCurrentInDll()) {
    atexit(hbThreadFuncUnexpectedStopped);
  }
wafwerar's avatar
wafwerar 已提交
749
#endif
L
Liu Jicong 已提交
750
  while (1) {
751
    if (1 == clientHbMgr.threadStop) {
L
Liu Jicong 已提交
752 753 754
      break;
    }

wafwerar's avatar
wafwerar 已提交
755
    taosThreadMutexLock(&clientHbMgr.lock);
756

L
Liu Jicong 已提交
757
    int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
D
dapan1121 已提交
758 759 760
    if (sz > 0) {
      hbGatherAppInfo();
    }
761

dengyihao's avatar
dengyihao 已提交
762
    SArray *mgr = taosArrayInit(sz, sizeof(void *));
L
fix  
Liu Jicong 已提交
763 764
    for (int i = 0; i < sz; i++) {
      SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
dengyihao's avatar
dengyihao 已提交
765 766 767
      if (pAppHbMgr == NULL) {
        continue;
      }
L
Liu Jicong 已提交
768

L
Liu Jicong 已提交
769 770
      int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
      if (connCnt == 0) {
dengyihao's avatar
dengyihao 已提交
771
        taosArrayPush(mgr, &pAppHbMgr);
L
Liu Jicong 已提交
772 773
        continue;
      }
L
fix  
Liu Jicong 已提交
774
      SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
dengyihao's avatar
dengyihao 已提交
775 776
      if (pReq == NULL || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
        tFreeClientHbBatchReq(pReq);
L
Liu Jicong 已提交
777 778
        continue;
      }
L
fix  
Liu Jicong 已提交
779
      int   tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
wafwerar's avatar
wafwerar 已提交
780
      void *buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
781
      if (buf == NULL) {
S
Shengliang Guan 已提交
782
        terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
783
        tFreeClientHbBatchReq(pReq);
dengyihao's avatar
dengyihao 已提交
784
        // hbClearReqInfo(pAppHbMgr);
dengyihao's avatar
dengyihao 已提交
785
        taosArrayPush(mgr, &pAppHbMgr);
L
Liu Jicong 已提交
786 787
        break;
      }
L
fix  
Liu Jicong 已提交
788

S
Shengliang Guan 已提交
789
      tSerializeSClientHbBatchReq(buf, tlen, pReq);
wafwerar's avatar
wafwerar 已提交
790
      SMsgSendInfo *pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
791

L
Liu Jicong 已提交
792
      if (pInfo == NULL) {
S
Shengliang Guan 已提交
793
        terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
794
        tFreeClientHbBatchReq(pReq);
dengyihao's avatar
dengyihao 已提交
795
        // hbClearReqInfo(pAppHbMgr);
wafwerar's avatar
wafwerar 已提交
796
        taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
797
        taosArrayPush(mgr, &pAppHbMgr);
L
Liu Jicong 已提交
798 799
        break;
      }
L
Liu Jicong 已提交
800
      pInfo->fp = hbAsyncCallBack;
L
Liu Jicong 已提交
801 802 803
      pInfo->msgInfo.pData = buf;
      pInfo->msgInfo.len = tlen;
      pInfo->msgType = TDMT_MND_HEARTBEAT;
D
dapan1121 已提交
804 805
      pInfo->param = taosMemoryMalloc(sizeof(int32_t));
      *(int32_t *)pInfo->param = i;
dengyihao's avatar
dengyihao 已提交
806
      pInfo->paramFreeFp = taosMemoryFree;
L
Liu Jicong 已提交
807 808
      pInfo->requestId = generateRequestId();
      pInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
809 810

      SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
L
fix  
Liu Jicong 已提交
811 812
      int64_t       transporterId = 0;
      SEpSet        epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
L
Liu Jicong 已提交
813
      asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
D
dapan1121 已提交
814
      tFreeClientHbBatchReq(pReq);
dengyihao's avatar
dengyihao 已提交
815
      // hbClearReqInfo(pAppHbMgr);
L
Liu Jicong 已提交
816 817

      atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
dengyihao's avatar
dengyihao 已提交
818
      taosArrayPush(mgr, &pAppHbMgr);
L
Liu Jicong 已提交
819
    }
820

dengyihao's avatar
dengyihao 已提交
821 822 823
    taosArrayDestroy(clientHbMgr.appHbMgrs);
    clientHbMgr.appHbMgrs = mgr;

wafwerar's avatar
wafwerar 已提交
824
    taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
825

L
Liu Jicong 已提交
826
    taosMsleep(HEARTBEAT_INTERVAL);
L
Liu Jicong 已提交
827 828 829 830 831
  }
  return NULL;
}

static int32_t hbCreateThread() {
wafwerar's avatar
wafwerar 已提交
832 833 834
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
L
Liu Jicong 已提交
835

D
dapan1121 已提交
836 837 838
  if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
L
Liu Jicong 已提交
839
  }
D
dapan1121 已提交
840
  taosThreadAttrDestroy(&thAttr);
L
Liu Jicong 已提交
841 842 843
  return 0;
}

L
Liu Jicong 已提交
844
static void hbStopThread() {
D
dapan1121 已提交
845 846 847
  if (0 == atomic_load_8(&clientHbMgr.inited)) {
    return;
  }
848
  if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) {
D
dapan1121 已提交
849
    tscDebug("hb thread already stopped");
850 851
    return;
  }
L
fix  
Liu Jicong 已提交
852

853 854 855 856 857 858
  // thread quit mode kill or inner exit from self-thread
  if (clientHbMgr.quitByKill) {
    taosThreadKill(clientHbMgr.thread, 0);
  } else {
    taosThreadJoin(clientHbMgr.thread, NULL);
  }
D
dapan1121 已提交
859

L
fix  
Liu Jicong 已提交
860
  tscDebug("hb thread stopped");
L
Liu Jicong 已提交
861 862
}

L
fix  
Liu Jicong 已提交
863
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
864 865 866 867
  if(hbMgrInit() != 0){
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
    return NULL;
  }
wafwerar's avatar
wafwerar 已提交
868
  SAppHbMgr *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
L
Liu Jicong 已提交
869 870 871 872 873 874
  if (pAppHbMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  // init stat
  pAppHbMgr->startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
875 876 877
  pAppHbMgr->connKeyCnt = 0;
  pAppHbMgr->reportCnt = 0;
  pAppHbMgr->reportBytes = 0;
878
  pAppHbMgr->key = taosStrdup(key);
L
Liu Jicong 已提交
879

L
Liu Jicong 已提交
880 881
  // init app info
  pAppHbMgr->pAppInstInfo = pAppInstInfo;
L
Liu Jicong 已提交
882 883 884

  // init hash info
  pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
885 886 887

  if (pAppHbMgr->activeInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
888
    taosMemoryFree(pAppHbMgr);
L
Liu Jicong 已提交
889 890
    return NULL;
  }
H
Haojun Liao 已提交
891

892
  // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
L
Liu Jicong 已提交
893

wafwerar's avatar
wafwerar 已提交
894
  taosThreadMutexLock(&clientHbMgr.lock);
L
Liu Jicong 已提交
895
  taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
D
dapan1121 已提交
896
  pAppHbMgr->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
wafwerar's avatar
wafwerar 已提交
897
  taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
898

L
Liu Jicong 已提交
899 900 901
  return pAppHbMgr;
}

D
dapan1121 已提交
902 903 904 905 906 907 908 909 910
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
  void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
  while (pIter != NULL) {
    SClientHbReq *pOneReq = pIter;
    tFreeClientHbReq(pOneReq);
    pIter = taosHashIterate(pTarget->activeInfo, pIter);
  }
  taosHashCleanup(pTarget->activeInfo);
  pTarget->activeInfo = NULL;
911

D
dapan1121 已提交
912 913 914 915 916 917 918 919 920 921 922 923
  taosMemoryFree(pTarget->key);
  taosMemoryFree(pTarget);
}

void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
  taosThreadMutexLock(&clientHbMgr.lock);
  int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int32_t i = 0; i < mgrSize; ++i) {
    SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
    if (pItem == *pAppHbMgr) {
      hbFreeAppHbMgr(*pAppHbMgr);
      *pAppHbMgr = NULL;
dengyihao's avatar
dengyihao 已提交
924
      taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
D
dapan1121 已提交
925 926 927 928 929 930
      break;
    }
  }
  taosThreadMutexUnlock(&clientHbMgr.lock);
}

D
dapan1121 已提交
931
void appHbMgrCleanup(void) {
L
Liu Jicong 已提交
932 933
  int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
  for (int i = 0; i < sz; i++) {
L
fix  
Liu Jicong 已提交
934
    SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
dengyihao's avatar
dengyihao 已提交
935
    if (pTarget == NULL) continue;
D
dapan1121 已提交
936
    hbFreeAppHbMgr(pTarget);
L
Liu Jicong 已提交
937 938 939 940
  }
}

int hbMgrInit() {
L
Liu Jicong 已提交
941 942 943 944
  // init once
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
  if (old == 1) return 0;

D
dapan1121 已提交
945 946
  clientHbMgr.appId = tGenIdPI64();
  tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId);
947

D
dapan1121 已提交
948
  clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
L
fix  
Liu Jicong 已提交
949
  clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
dengyihao's avatar
dengyihao 已提交
950 951

  TdThreadMutexAttr attr = {0};
dengyihao's avatar
dengyihao 已提交
952

dengyihao's avatar
dengyihao 已提交
953
  int ret = taosThreadMutexAttrInit(&attr);
954 955 956 957
  if(ret != 0){
    uError("hbMgrInit:taosThreadMutexAttrInit error")
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
958

dengyihao's avatar
dengyihao 已提交
959
  ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
960 961 962 963
  if(ret != 0){
    uError("hbMgrInit:taosThreadMutexAttrSetType error")
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
964 965

  ret = taosThreadMutexInit(&clientHbMgr.lock, &attr);
966 967 968 969
  if(ret != 0){
    uError("hbMgrInit:taosThreadMutexInit error")
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
970 971

  ret = taosThreadMutexAttrDestroy(&attr);
972 973 974 975
  if(ret != 0){
    uError("hbMgrInit:taosThreadMutexAttrDestroy error")
    return ret;
  }
L
Liu Jicong 已提交
976 977 978 979 980

  // init handle funcs
  hbMgrInitHandle();

  // init backgroud thread
D
dapan1121 已提交
981
  hbCreateThread();
L
Liu Jicong 已提交
982

L
Liu Jicong 已提交
983 984 985 986
  return 0;
}

void hbMgrCleanUp() {
D
dapan1121 已提交
987
  hbStopThread();
L
fix  
Liu Jicong 已提交
988

L
Liu Jicong 已提交
989
  // destroy all appHbMgr
L
Liu Jicong 已提交
990 991 992
  int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
  if (old == 0) return;

wafwerar's avatar
wafwerar 已提交
993
  taosThreadMutexLock(&clientHbMgr.lock);
D
dapan1121 已提交
994
  appHbMgrCleanup();
L
fix  
Liu Jicong 已提交
995
  taosArrayDestroy(clientHbMgr.appHbMgrs);
wafwerar's avatar
wafwerar 已提交
996
  taosThreadMutexUnlock(&clientHbMgr.lock);
L
fix  
Liu Jicong 已提交
997

998
  clientHbMgr.appHbMgrs = NULL;
L
Liu Jicong 已提交
999 1000
}

D
dapan1121 已提交
1001
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
L
Liu Jicong 已提交
1002
  // init hash in activeinfo
L
fix  
Liu Jicong 已提交
1003
  void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
L
Liu Jicong 已提交
1004 1005 1006
  if (data != NULL) {
    return 0;
  }
D
dapan1121 已提交
1007
  SClientHbReq hbReq = {0};
L
Liu Jicong 已提交
1008
  hbReq.connKey = connKey;
D
dapan1121 已提交
1009
  hbReq.clusterId = clusterId;
1010
  // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
L
fix  
Liu Jicong 已提交
1011

L
Liu Jicong 已提交
1012
  taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
L
fix  
Liu Jicong 已提交
1013

L
Liu Jicong 已提交
1014
  atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
1015 1016 1017
  return 0;
}

D
dapan1121 已提交
1018
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) {
L
Liu Jicong 已提交
1019
  SClientHbKey connKey = {
D
dapan1121 已提交
1020
      .tscRid = tscRefId,
D
dapan1121 已提交
1021
      .connType = connType,
L
Liu Jicong 已提交
1022
  };
D
dapan1121 已提交
1023

D
dapan1121 已提交
1024 1025
  switch (connType) {
    case CONN_TYPE__QUERY: {
D
dapan1121 已提交
1026
      return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId);
D
dapan1121 已提交
1027
    }
D
dapan1121 已提交
1028
    case CONN_TYPE__TMQ: {
L
Liu Jicong 已提交
1029
      return 0;
D
dapan1121 已提交
1030 1031
    }
    default:
L
Liu Jicong 已提交
1032
      return 0;
D
dapan1121 已提交
1033 1034 1035
  }
}

L
fix  
Liu Jicong 已提交
1036
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
D
dapan1121 已提交
1037
  SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
D
dapan1121 已提交
1038
  if (pReq) {
D
dapan1121 已提交
1039
    tFreeClientHbReq(pReq);
D
dapan1121 已提交
1040
    taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
D
dapan1121 已提交
1041
    taosHashRelease(pAppHbMgr->activeInfo, pReq);
D
dapan1121 已提交
1042 1043
  }

D
dapan1121 已提交
1044
  if (NULL == pReq) {
D
dapan1121 已提交
1045 1046
    return;
  }
L
Liu Jicong 已提交
1047

L
Liu Jicong 已提交
1048
  atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
L
Liu Jicong 已提交
1049
}
1050 1051 1052 1053 1054

// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
void taos_set_hb_quit(int8_t quitByKill) {
  clientHbMgr.quitByKill = quitByKill;
}