mndProfile.c 24.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndProfile.h"
#include "mndDb.h"
S
Shengliang Guan 已提交
19
#include "mndDnode.h"
S
Shengliang Guan 已提交
20
#include "mndMnode.h"
D
dapan1121 已提交
21
#include "mndQnode.h"
S
Shengliang Guan 已提交
22
#include "mndShow.h"
S
Shengliang Guan 已提交
23
#include "mndStb.h"
S
Shengliang Guan 已提交
24
#include "mndUser.h"
S
Shengliang Guan 已提交
25
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
26
#include "version.h"
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
typedef struct {
L
Liu Jicong 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
S
Shengliang Guan 已提交
42
  SArray  *pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
43 44
} SConnObj;

L
Liu Jicong 已提交
45 46
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
47
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
48
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
49
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
S
Shengliang Guan 已提交
50
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
51
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
52 53 54 55 56 57
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
58 59 60 61 62
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
63 64
  // in ms
  int32_t connCheckTime = tsShellActivityTimer * 2 * 1000;
S
Shengliang Guan 已提交
65 66 67 68 69 70 71
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
72 73 74 75
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
76

L
Liu Jicong 已提交
77
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
78
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
L
Liu Jicong 已提交
79
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
80
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
81

S
Shengliang Guan 已提交
82 83 84 85 86 87 88 89 90 91 92
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

L
Liu Jicong 已提交
93 94
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
95 96
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
97
  char    connStr[255] = {0};
D
dapan1121 已提交
98 99
  int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
  int32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
100
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
101

S
Shengliang Guan 已提交
102
  SConnObj connObj = {.id = connId,
L
Liu Jicong 已提交
103
                      .connType = connType,
S
Shengliang Guan 已提交
104 105
                      .appStartTimeMs = startTime,
                      .pid = pid,
S
shm  
Shengliang Guan 已提交
106 107
                      .ip = ip,
                      .port = port,
S
Shengliang Guan 已提交
108
                      .killed = 0,
S
Shengliang Guan 已提交
109 110
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
D
dapan1121 已提交
111
                      .killId = 0,
S
Shengliang Guan 已提交
112 113 114
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
115
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
116
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
117 118
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
119
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
120
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
121 122
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
123
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
124 125
    return NULL;
  } else {
D
dapan1121 已提交
126
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
127 128
    return pConn;
  }
S
Shengliang Guan 已提交
129 130 131
}

static void mndFreeConn(SConnObj *pConn) {
D
dapan1121 已提交
132 133
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
  
D
dapan1121 已提交
134
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
135 136
}

D
dapan1121 已提交
137
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
138 139
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
140
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
141
  if (pConn == NULL) {
D
dapan1121 已提交
142
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
143 144 145
    return NULL;
  }

S
Shengliang Guan 已提交
146
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
147
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
148

D
dapan1121 已提交
149
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
150 151 152 153 154
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
155
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
158 159 160
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

H
Haojun Liao 已提交
161
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
162 163
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
164 165 166 167 168
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
169 170
  }

H
Haojun Liao 已提交
171
  return pConn;
S
Shengliang Guan 已提交
172 173 174
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
175 176 177
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
178 179
}

S
Shengliang Guan 已提交
180 181
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
  SMnode     *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
182 183 184 185 186
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};
S
shm  
Shengliang Guan 已提交
187
  char        ip[30] = {0};
S
Shengliang Guan 已提交
188

S
Shengliang Guan 已提交
189
  if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
S
Shengliang Guan 已提交
190 191 192
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
193

S
Shengliang Guan 已提交
194
  taosIp2String(pReq->conn.clientIp, ip);
S
Shengliang Guan 已提交
195

S
Shengliang Guan 已提交
196
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
197
  if (pUser == NULL) {
S
Shengliang Guan 已提交
198
    mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
199 200
    goto CONN_OVER;
  }
dengyihao's avatar
dengyihao 已提交
201
  if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
S
Shengliang Guan 已提交
202
    mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
dengyihao's avatar
dengyihao 已提交
203 204 205
    code = TSDB_CODE_RPC_AUTH_FAILURE;
    goto CONN_OVER;
  }
206

S
Shengliang Guan 已提交
207
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
208 209 210
    char db[TSDB_DB_FNAME_LEN];
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
211 212
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
213
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
214
      goto CONN_OVER;
S
Shengliang Guan 已提交
215 216 217
    }
  }

S
Shengliang Guan 已提交
218 219
  pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
                        connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
220
  if (pConn == NULL) {
S
Shengliang Guan 已提交
221
    mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
222
    goto CONN_OVER;
S
Shengliang Guan 已提交
223 224
  }

D
dapan1121 已提交
225 226
  mndAcquireConn(pMnode, pConn->id);

S
Shengliang Guan 已提交
227 228 229 230 231
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
232
  connectRsp.connType = connReq.connType;
D
dapan 已提交
233
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
S
Shengliang Guan 已提交
234

S
Shengliang Guan 已提交
235 236 237
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
238

S
Shengliang Guan 已提交
239 240 241 242 243
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
244

S
Shengliang Guan 已提交
245 246
  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;
247

S
Shengliang Guan 已提交
248
  mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
249 250 251 252 253 254 255 256 257 258

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
259 260
}

D
dapan1121 已提交
261 262
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
263

D
dapan1121 已提交
264 265
  pConn->pQueries = pBasic->queryDesc;
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
266 267

  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
S
Shengliang Guan 已提交
268 269 270 271

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
272
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
273
#if 0
wafwerar's avatar
wafwerar 已提交
274
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
290
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
323
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
324 325 326 327 328 329 330 331 332
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
333 334
#endif
  return NULL;
L
Liu Jicong 已提交
335 336
}

L
Liu Jicong 已提交
337 338
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
339
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
340
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
D
dapan1121 已提交
341 342 343 344

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

D
dapan1121 已提交
345
    SRpcConnInfo connInfo = pMsg->conn;
D
dapan1121 已提交
346 347

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
348 349 350
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
                            pBasic->pid, pBasic->app, 0);
D
dapan1121 已提交
351 352 353 354 355 356 357 358
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
        mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
      }
    } else if (pConn->killed) {
      mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
L
Liu Jicong 已提交
359
      mndReleaseConn(pMnode, pConn);
D
dapan1121 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }

    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
D
dapan1121 已提交
383
    rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
L
Liu Jicong 已提交
384
    rspBasic->onlineDnodes = 1;  // TODO
D
dapan1121 已提交
385
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
D
dapan1121 已提交
386 387 388

    mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
    
D
dapan1121 已提交
389 390 391 392 393 394 395
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
396
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
397 398 399 400 401 402 403
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
404
    tFreeClientHbRsp(&hbRsp);
D
dapan1121 已提交
405 406 407 408 409 410 411 412
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
413
      case HEARTBEAT_KEY_USER_AUTHINFO: {
S
Shengliang Guan 已提交
414
        void   *rspMsg = NULL;
D
dapan 已提交
415 416 417 418 419 420 421 422
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
423
      case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
424
        void   *rspMsg = NULL;
D
dapan1121 已提交
425 426 427 428 429 430 431 432 433
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
434
        void   *rspMsg = NULL;
D
dapan1121 已提交
435
        int32_t rspLen = 0;
D
dapan1121 已提交
436
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
457 458
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
459

L
Liu Jicong 已提交
460
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
461
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
D
dapan1121 已提交
462
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
S
Shengliang Guan 已提交
463 464 465 466
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
467 468
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
469

S
Shengliang Guan 已提交
470
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
471
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
472
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
473
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
S
Shengliang Guan 已提交
474
      mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
D
dapan1121 已提交
475
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
476 477 478
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
479
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
480
      }
L
Liu Jicong 已提交
481 482
    }
  }
S
Shengliang Guan 已提交
483
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
484

S
Shengliang Guan 已提交
485 486 487 488
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
489
  tFreeClientHbBatchRsp(&batchRsp);
S
Shengliang Guan 已提交
490 491
  pReq->info.rspLen = tlen;
  pReq->info.rsp = buf;
S
Shengliang Guan 已提交
492 493 494 495

  return 0;
}

S
Shengliang Guan 已提交
496 497
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
498 499
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
500
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
501
  if (pUser == NULL) return 0;
502
  if (!pUser->superUser) {
503 504 505 506 507 508
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
509
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
510
  if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
511 512 513 514 515
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
516

S
Shengliang Guan 已提交
517
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
518
  if (pConn == NULL) {
S
Shengliang Guan 已提交
519
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
520 521 522
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
523
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
D
dapan1121 已提交
524
    pConn->killId = killReq.queryId;
525 526 527 528 529
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
530 531
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
532 533
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
534
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
535
  if (pUser == NULL) return 0;
536
  if (!pUser->superUser) {
537 538 539 540 541 542
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
543
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
544
  if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
545 546 547
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
548

S
Shengliang Guan 已提交
549
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
550
  if (pConn == NULL) {
S
Shengliang Guan 已提交
551
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
552 553 554
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
555
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
556 557 558 559 560 561
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
562 563
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
564
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
565
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
566 567 568 569
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

H
Haojun Liao 已提交
570 571 572 573 574
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
575
  while (numOfRows < rows) {
H
Haojun Liao 已提交
576
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
577
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
578 579

    cols = 0;
580 581
#if 0
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
D
dapan1121 已提交
582
    *(uint32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
583 584
    cols++;

585 586
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
587 588 589
    cols++;

    // app name
590 591
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
592 593 594
    cols++;

    // app pid
595
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
596
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
597 598
    cols++;

599
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
600
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
601
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
602 603
    cols++;

604
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
605
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
606 607
    cols++;

608
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
609 610
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
611
    cols++;
612
#endif
S
Shengliang Guan 已提交
613 614 615 616

    numOfRows++;
  }

617
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
618 619 620 621

  return numOfRows;
}

S
Shengliang Guan 已提交
622 623 624
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode *pMnode = pReq->info.node;
  int32_t numOfRows = 0;
D
dapan1121 已提交
625
#if 0
S
Shengliang Guan 已提交
626
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
627
  int32_t   cols = 0;
628 629
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
630 631
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

H
Haojun Liao 已提交
632 633 634 635 636
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
637
  while (numOfRows < rows) {
H
Haojun Liao 已提交
638
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
639
    if (pConn == NULL) {
H
Haojun Liao 已提交
640
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
641 642 643
      break;
    }

S
Shengliang Guan 已提交
644
    if (numOfRows + pConn->numOfQueries >= rows) {
H
Haojun Liao 已提交
645 646
      taosCacheDestroyIter(pShow->pIter);
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
647 648 649
      break;
    }

S
Shengliang Guan 已提交
650 651
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
652 653
      cols = 0;

654
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
655 656 657
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

658
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
659
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
660 661
      cols++;

662 663
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
664 665
      cols++;

666
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
667
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
668
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
669 670 671 672
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
673
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
674

675
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
676 677
      cols++;

678
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
679 680 681
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

682
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
683 684 685 686
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
687 688
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
689 690
      cols++;

691
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
692 693 694 695
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
696
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
697 698
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
699 700
      cols++;

701
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
702 703 704
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

705
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
706 707 708
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

709 710
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
711 712
      cols++;

713 714
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
715 716 717 718 719 720
      cols++;

      numOfRows++;
    }
  }

721
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
722
#endif
S
Shengliang Guan 已提交
723 724 725 726
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
727 728 729
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
730
}
S
Shengliang Guan 已提交
731

S
Shengliang Guan 已提交
732 733 734
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  return taosCacheGetNumOfObj(pMgmt->cache);
D
dapan1121 已提交
735
}