You need to sign in or sign up before continuing.
mndProfile.c 33.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndProfile.h"
#include "mndDb.h"
S
Shengliang Guan 已提交
19
#include "mndDnode.h"
S
Shengliang Guan 已提交
20
#include "mndMnode.h"
dengyihao's avatar
dengyihao 已提交
21
#include "mndPrivilege.h"
D
dapan1121 已提交
22
#include "mndQnode.h"
S
Shengliang Guan 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
S
Shengliang Guan 已提交
25
#include "mndUser.h"
S
Shengliang Guan 已提交
26
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
27
#include "version.h"
S
Shengliang Guan 已提交
28

S
Shengliang Guan 已提交
29
typedef struct {
L
Liu Jicong 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
43
  SRWLatch queryLock;
S
Shengliang Guan 已提交
44
  SArray  *pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
45 46
} SConnObj;

D
dapan1121 已提交
47 48 49 50 51 52 53 54 55 56
typedef struct {
  int64_t            appId;
  uint32_t           ip;
  int32_t            pid;
  char               name[TSDB_APP_NAME_LEN];
  int64_t            startTime;
  SAppClusterSummary summary;
  int64_t            lastAccessTimeMs;
} SAppObj;

L
Liu Jicong 已提交
57 58
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
59
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
60
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
61
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
S
Shengliang Guan 已提交
62
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
63
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
64 65 66 67
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
68 69
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
70
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
D
dapan1121 已提交
71 72 73
static void      mndFreeApp(SAppObj *pApp);
static int32_t   mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void      mndCancelGetNextApp(SMnode *pMnode, void *pIter);
D
dapan1121 已提交
74
static int32_t   mndProcessSvrVerReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
75 76 77 78

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
79
  // in ms
D
dapan1121 已提交
80
  int32_t checkTime = tsShellActivityTimer * 2 * 1000;
D
dapan1121 已提交
81
  pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
D
dapan1121 已提交
82 83 84 85 86 87 88 89
  if (pMgmt->connCache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

  pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
  if (pMgmt->appCache == NULL) {
S
Shengliang Guan 已提交
90 91 92 93 94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
95 96 97 98
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
D
dapan1121 已提交
99
  mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
S
Shengliang Guan 已提交
100

101
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
102
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
D
dapan1121 已提交
103
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
104
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
D
dapan1121 已提交
105 106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
107

S
Shengliang Guan 已提交
108 109 110 111 112
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
113 114 115 116 117 118 119 120
  if (pMgmt->connCache != NULL) {
    taosCacheCleanup(pMgmt->connCache);
    pMgmt->connCache = NULL;
  }

  if (pMgmt->appCache != NULL) {
    taosCacheCleanup(pMgmt->appCache);
    pMgmt->appCache = NULL;
S
Shengliang Guan 已提交
121 122 123
  }
}

L
Liu Jicong 已提交
124 125
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
126 127
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
128 129
  char     connStr[255] = {0};
  int32_t  len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
D
dapan1121 已提交
130
  uint32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
131
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
132

S
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146
  SConnObj connObj = {
      .id = connId,
      .connType = connType,
      .appStartTimeMs = startTime,
      .pid = pid,
      .ip = ip,
      .port = port,
      .killed = 0,
      .loginTimeMs = taosGetTimestampMs(),
      .lastAccessTimeMs = 0,
      .killId = 0,
      .numOfQueries = 0,
      .pQueries = NULL,
  };
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
149
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
150 151
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
152
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
153 154
  SConnObj *pConn =
      taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
155 156
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
157
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
158 159
    return NULL;
  } else {
D
dapan1121 已提交
160
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
161 162
    return pConn;
  }
S
Shengliang Guan 已提交
163 164 165
}

static void mndFreeConn(SConnObj *pConn) {
166
  taosWLockLatch(&pConn->queryLock);
D
dapan1121 已提交
167
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
168
  taosWUnLockLatch(&pConn->queryLock);
169

D
dapan1121 已提交
170
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
171 172
}

D
dapan1121 已提交
173
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
174 175
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
176
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
177
  if (pConn == NULL) {
D
dapan1121 已提交
178
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
179 180 181
    return NULL;
  }

D
dapan1121 已提交
182
  pConn->lastAccessTimeMs = taosGetTimestampMs();
D
dapan1121 已提交
183
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
184 185 186 187 188
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
189
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
190

S
Shengliang Guan 已提交
191
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
192
  taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
S
Shengliang Guan 已提交
193 194
}

H
Haojun Liao 已提交
195
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
196 197
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
198 199 200 201 202
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
203 204
  }

H
Haojun Liao 已提交
205
  return pConn;
S
Shengliang Guan 已提交
206 207 208
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
209 210 211
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
212 213
}

S
Shengliang Guan 已提交
214
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
215 216 217 218 219 220
  SMnode         *pMnode = pReq->info.node;
  SUserObj       *pUser = NULL;
  SDbObj         *pDb = NULL;
  SConnObj       *pConn = NULL;
  int32_t         code = -1;
  SConnectReq     connReq = {0};
S
Shengliang Guan 已提交
221
  char            ip[24] = {0};
S
Shengliang Guan 已提交
222
  const STraceId *trace = &pReq->info.traceId;
S
Shengliang Guan 已提交
223

S
Shengliang Guan 已提交
224
  if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
S
Shengliang Guan 已提交
225
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
226
    goto _OVER;
S
Shengliang Guan 已提交
227
  }
S
Shengliang Guan 已提交
228

229
  taosIp2String(pReq->info.conn.clientIp, ip);
230 231 232 233
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT) != 0) {
    mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, terrstr());
    goto _OVER;
  }
S
Shengliang Guan 已提交
234

235
  pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
236
  if (pUser == NULL) {
S
Shengliang Guan 已提交
237 238
    mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, terrstr());
    goto _OVER;
239
  }
S
Shengliang Guan 已提交
240 241 242

  if (strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1) != 0) {
    mGError("user:%s, failed to login from %s since invalid pass, input:%s", pReq->info.conn.user, ip, connReq.passwd);
dengyihao's avatar
dengyihao 已提交
243
    code = TSDB_CODE_RPC_AUTH_FAILURE;
S
Shengliang Guan 已提交
244 245 246
    goto _OVER;
  }

S
Shengliang Guan 已提交
247
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
248
    char db[TSDB_DB_FNAME_LEN] = {0};
S
Shengliang Guan 已提交
249 250
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
251 252
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
253 254
      mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
              terrstr());
255 256 257 258
      goto _OVER;
    }

    if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
S
Shengliang Guan 已提交
259
      goto _OVER;
S
Shengliang Guan 已提交
260 261 262
    }
  }

263 264
  pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
                        pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
265
  if (pConn == NULL) {
S
Shengliang Guan 已提交
266
    mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
S
Shengliang Guan 已提交
267
    goto _OVER;
S
Shengliang Guan 已提交
268 269
  }

S
Shengliang Guan 已提交
270 271 272 273 274
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
275
  connectRsp.connType = connReq.connType;
D
dapan 已提交
276
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
dengyihao's avatar
dengyihao 已提交
277
  connectRsp.svrTimestamp = taosGetTimestampSec();
278

D
dapan1121 已提交
279 280
  strcpy(connectRsp.sVer, version);
  snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
S
Shengliang Guan 已提交
281 282
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
283

S
Shengliang Guan 已提交
284
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
S
Shengliang Guan 已提交
285
  if (contLen < 0) goto _OVER;
S
Shengliang Guan 已提交
286
  void *pRsp = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
287
  if (pRsp == NULL) goto _OVER;
S
Shengliang Guan 已提交
288
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
289

S
Shengliang Guan 已提交
290 291
  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;
292

S
Shengliang Guan 已提交
293
  mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
294 295 296

  code = 0;

S
Shengliang Guan 已提交
297
_OVER:
298 299 300 301 302 303

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
304 305
}

D
dapan1121 已提交
306
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
307 308
  taosWLockLatch(&pConn->queryLock);

D
dapan1121 已提交
309
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
310

D
dapan1121 已提交
311
  pConn->pQueries = pBasic->queryDesc;
312
  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
D
dapan1121 已提交
313
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
314

315 316 317
  mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);

  taosWUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
318 319 320 321

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
322
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) {
D
dapan1121 已提交
323 324 325 326 327 328 329 330 331 332 333
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SAppObj app;
  app.appId = pReq->appId;
  app.ip = clientIp;
  app.pid = pReq->pid;
  strcpy(app.name, pReq->name);
  app.startTime = pReq->startTime;
  memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
  app.lastAccessTimeMs = taosGetTimestampMs();

S
Shengliang Guan 已提交
334
  const int32_t keepTime = tsShellActivityTimer * 3;
D
dapan1121 已提交
335 336 337 338 339 340
  SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
  if (pApp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
    return NULL;
  }
S
Shengliang Guan 已提交
341

D
dapan1121 已提交
342 343 344 345
  mTrace("app %" PRIx64 " is put into cache", pReq->appId);
  return pApp;
}

S
Shengliang Guan 已提交
346
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
D
dapan1121 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372

static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
  if (pApp == NULL) {
    mDebug("app %" PRIx64 " not in cache", appId);
    return NULL;
  }

  pApp->lastAccessTimeMs = (uint64_t)taosGetTimestampMs();

  mTrace("app %" PRIx64 " acquired from cache", appId);
  return pApp;
}

static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
  if (pApp == NULL) return;
  mTrace("release app %" PRIx64 " to cache", pApp->appId);

  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
}

void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
  SAppObj *pApp = NULL;
S
Shengliang Guan 已提交
373
  bool     hasNext = taosCacheIterNext(pIter);
D
dapan1121 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
  if (hasNext) {
    size_t dataLen = 0;
    pApp = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
  }

  return pApp;
}

static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
}

S
Shengliang Guan 已提交
390
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
391
#if 0
wafwerar's avatar
wafwerar 已提交
392
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
408
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
441
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
442 443 444 445 446 447 448 449 450
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
451 452
#endif
  return NULL;
L
Liu Jicong 已提交
453 454
}

D
dapan1121 已提交
455
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
S
Shengliang Guan 已提交
456 457
  SAppHbReq *pReq = &pHbReq->app;
  SAppObj   *pApp = mndAcquireApp(pMnode, pReq->appId);
D
dapan1121 已提交
458 459 460 461 462 463
  if (pApp == NULL) {
    pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
    if (pApp == NULL) {
      mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
      return -1;
    } else {
S
Shengliang Guan 已提交
464
      mDebug("a new app %" PRIx64 " is created", pReq->appId);
S
Shengliang Guan 已提交
465
      mndReleaseApp(pMnode, pApp);
D
dapan1121 已提交
466 467 468 469 470 471 472 473 474 475 476
      return TSDB_CODE_SUCCESS;
    }
  }

  memcpy(&pApp->summary, &pReq->summary, sizeof(pReq->summary));

  mndReleaseApp(pMnode, pApp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
477 478 479 480 481
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
  SSdb      *pSdb = pMnode->pSdb;
  SDnodeObj *pDnode = NULL;
  int64_t    curMs = taosGetTimestampMs();
  void      *pIter = NULL;
482

D
dapan1121 已提交
483 484 485
  while (true) {
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;
486

D
dapan1121 已提交
487 488 489 490
    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (online) {
      (*num)++;
    }
491

D
dapan1121 已提交
492 493 494 495 496 497
    sdbRelease(pSdb, pDnode);
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
498 499
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
500
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
501
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
S
Shengliang Guan 已提交
502
  SRpcConnInfo  connInfo = pMsg->info.conn;
D
dapan1121 已提交
503 504

  mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
D
dapan1121 已提交
505 506 507 508 509

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
510 511
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
D
dapan1121 已提交
512
                            pHbReq->app.pid, pHbReq->app.name, 0);
D
dapan1121 已提交
513 514 515 516
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
D
dapan1121 已提交
517
        mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
D
dapan1121 已提交
518 519
      }
    }
520

D
dapan1121 已提交
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
D
dapan1121 已提交
540
    rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
D
dapan1121 已提交
541
    mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes);
D
dapan1121 已提交
542
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
D
dapan1121 已提交
543 544

    mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
545

D
dapan1121 已提交
546 547 548
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
549 550
  } else {
    mDebug("no query info in hb msg");
D
dapan1121 已提交
551 552 553 554
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
555
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
556 557 558 559 560 561 562
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
563
    tFreeClientHbRsp(&hbRsp);
D
dapan1121 已提交
564 565 566 567 568 569 570 571
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
572
      case HEARTBEAT_KEY_USER_AUTHINFO: {
S
Shengliang Guan 已提交
573
        void   *rspMsg = NULL;
D
dapan 已提交
574 575 576 577 578 579 580 581
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
582
      case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
583
        void   *rspMsg = NULL;
D
dapan1121 已提交
584 585 586 587 588 589 590 591 592
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
593
        void   *rspMsg = NULL;
D
dapan1121 已提交
594
        int32_t rspLen = 0;
D
dapan1121 已提交
595
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
616 617
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
618

L
Liu Jicong 已提交
619
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
620
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
D
dapan1121 已提交
621
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
S
Shengliang Guan 已提交
622 623 624 625
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
626
  SClientHbBatchRsp batchRsp = {0};
dengyihao's avatar
dengyihao 已提交
627
  batchRsp.svrTimestamp = taosGetTimestampSec();
L
Liu Jicong 已提交
628
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
629

S
Shengliang Guan 已提交
630
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
631
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
632
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
633
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
S
Shengliang Guan 已提交
634
      mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
D
dapan1121 已提交
635
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
636 637 638
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
639
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
640
      }
L
Liu Jicong 已提交
641 642
    }
  }
S
Shengliang Guan 已提交
643
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
644

S
Shengliang Guan 已提交
645 646 647 648
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
649
  tFreeClientHbBatchRsp(&batchRsp);
S
Shengliang Guan 已提交
650 651
  pReq->info.rspLen = tlen;
  pReq->info.rsp = buf;
S
Shengliang Guan 已提交
652 653 654 655

  return 0;
}

S
Shengliang Guan 已提交
656 657
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
658 659
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
660
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
661
  if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
662 663 664 665
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

D
dapan1121 已提交
666
  mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
667 668 669 670
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
671
  int32_t  connId = 0;
D
dapan1121 已提交
672
  uint64_t queryId = 0;
S
Shengliang Guan 已提交
673
  char    *p = strchr(killReq.queryStrId, ':');
D
dapan1121 已提交
674 675 676 677 678 679 680 681
  if (NULL == p) {
    mError("invalid query id %s", killReq.queryStrId);
    terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
    return -1;
  }
  *p = 0;
  connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
  queryId = taosStr2UInt64(p + 1, NULL, 16);
682

D
dapan1121 已提交
683
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
684
  if (pConn == NULL) {
D
dapan1121 已提交
685
    mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
686 687 688
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
D
dapan1121 已提交
689
    mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
D
dapan1121 已提交
690
    pConn->killId = queryId;
D
dapan1121 已提交
691
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
692 693 694 695
    return 0;
  }
}

S
Shengliang Guan 已提交
696 697
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
698 699
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
700
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
701
  if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
702 703 704
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
705

706 707 708 709
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN) != 0) {
    return -1;
  }

D
dapan1121 已提交
710
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
711
  if (pConn == NULL) {
D
dapan1121 已提交
712
    mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
713 714 715
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
D
dapan1121 已提交
716
    mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
717
    pConn->killed = 1;
D
dapan1121 已提交
718
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
719 720 721 722
    return TSDB_CODE_SUCCESS;
  }
}

D
dapan1121 已提交
723
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
724
  int32_t       code = -1;
D
dapan1121 已提交
725 726
  SServerVerRsp rsp = {0};
  strcpy(rsp.ver, version);
727

D
dapan1121 已提交
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
  int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
  if (contLen < 0) goto _over;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto _over;
  tSerializeSServerVerRsp(pRsp, contLen, &rsp);

  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;

  code = 0;

_over:

  return code;
}

744
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
745 746 747 748 749 750
  SMnode   *pMnode = pReq->info.node;
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  int32_t   cols = 0;
  SConnObj *pConn = NULL;

H
Haojun Liao 已提交
751 752
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
753
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
H
Haojun Liao 已提交
754 755
  }

S
Shengliang Guan 已提交
756
  while (numOfRows < rows) {
H
Haojun Liao 已提交
757
    pConn = mndGetNextConn(pMnode, pShow->pIter);
758 759 760 761
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
762 763

    cols = 0;
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);

    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(user, pConn->user);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)user, false);

    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
    STR_TO_VARSTR(app, pConn->app);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)app, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);

    char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
    varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
S
Shengliang Guan 已提交
792 793 794 795

    numOfRows++;
  }

796
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
797 798 799
  return numOfRows;
}

D
dapan1121 已提交
800
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
801 802 803 804 805 806
  SMnode   *pMnode = pReq->info.node;
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  int32_t   cols = 0;
  SConnObj *pConn = NULL;

H
Haojun Liao 已提交
807 808
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
809
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
H
Haojun Liao 已提交
810 811
  }

S
Shengliang Guan 已提交
812
  while (numOfRows < rows) {
H
Haojun Liao 已提交
813
    pConn = mndGetNextConn(pMnode, pShow->pIter);
814 815 816 817
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
818

819
    taosRLockLatch(&pConn->queryLock);
D
dapan1121 已提交
820
    if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) {
821
      taosRUnLockLatch(&pConn->queryLock);
D
dapan1121 已提交
822
      continue;
S
Shengliang Guan 已提交
823 824
    }

D
dapan1121 已提交
825 826
    int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
    for (int32_t i = 0; i < numOfQueries; ++i) {
827
      SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
S
Shengliang Guan 已提交
828 829
      cols = 0;

830 831 832
      char queryId[26 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
      varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
833
      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
834 835 836
      colDataAppend(pColInfo, numOfRows, (const char *)queryId, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
837
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false);
S
Shengliang Guan 已提交
838

D
dapan1121 已提交
839
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
840
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
S
Shengliang Guan 已提交
841

842 843 844 845 846 847
      char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
      STR_TO_VARSTR(app, pConn->app);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)app, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
848
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);
849

D
dapan1121 已提交
850
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
851
      STR_TO_VARSTR(user, pConn->user);
D
dapan1121 已提交
852
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
853
      colDataAppend(pColInfo, numOfRows, (const char *)user, false);
S
Shengliang Guan 已提交
854

D
dapan1121 已提交
855 856 857
      char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
      varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
858
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
859
      colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
S
Shengliang Guan 已提交
860

D
dapan1121 已提交
861
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
862
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false);
S
Shengliang Guan 已提交
863

D
dapan1121 已提交
864
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
865
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false);
S
Shengliang Guan 已提交
866

867 868
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
S
Shengliang Guan 已提交
869

D
dapan1121 已提交
870
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
871 872
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);

873
      char    subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
874 875 876 877 878 879
      int32_t strSize = sizeof(subStatus);
      int32_t offset = VARSTR_HEADER_SIZE;
      for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
        if (i) {
          offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
        }
880
        SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
881 882 883 884 885 886 887 888 889 890
        offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
      }
      varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, subStatus, false);

      char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
      STR_TO_VARSTR(sql, pQuery->sql);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
S
Shengliang Guan 已提交
891 892 893

      numOfRows++;
    }
894

895
    taosRUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
896 897
  }

898
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
899 900 901
  return numOfRows;
}

D
dapan1121 已提交
902
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
S
Shengliang Guan 已提交
903 904 905 906 907 908
  SMnode  *pMnode = pReq->info.node;
  SSdb    *pSdb = pMnode->pSdb;
  int32_t  numOfRows = 0;
  int32_t  cols = 0;
  SAppObj *pApp = NULL;

D
dapan1121 已提交
909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
  }

  while (numOfRows < rows) {
    pApp = mndGetNextApp(pMnode, pShow->pIter);
    if (pApp == NULL) {
      pShow->pIter = NULL;
      break;
    }

    cols = 0;

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->appId, false);

D
dapan1121 已提交
926 927 928
    char ip[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&ip[VARSTR_HEADER_SIZE], "%s", taosIpStr(pApp->ip));
    varDataLen(ip) = strlen(&ip[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
929
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
930
    colDataAppend(pColInfo, numOfRows, (const char *)ip, false);
D
dapan1121 已提交
931 932 933 934

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->pid, false);

D
dapan1121 已提交
935 936 937
    char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&name[VARSTR_HEADER_SIZE], "%s", pApp->name);
    varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
938
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
939
    colDataAppend(pColInfo, numOfRows, (const char *)name, false);
D
dapan1121 已提交
940 941

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
942
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->startTime, false);
D
dapan1121 已提交
943 944

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
945
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertsReq, false);
D
dapan1121 已提交
946 947

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
948
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertRows, false);
D
dapan1121 已提交
949 950

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
951
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.insertElapsedTime, false);
D
dapan1121 已提交
952 953

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
954
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.insertBytes, false);
D
dapan1121 已提交
955 956

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
957
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.fetchBytes, false);
D
dapan1121 已提交
958 959

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
960
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.queryElapsedTime, false);
D
dapan1121 已提交
961 962

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
963
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfSlowQueries, false);
D
dapan1121 已提交
964 965

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
966
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.totalRequests, false);
D
dapan1121 已提交
967 968

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
969
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.currentRequests, false);
D
dapan1121 已提交
970 971

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
972
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->lastAccessTimeMs, false);
D
dapan1121 已提交
973 974 975 976 977 978 979 980

    numOfRows++;
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

S
Shengliang Guan 已提交
981
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
982 983 984
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
985
}
S
Shengliang Guan 已提交
986

S
Shengliang Guan 已提交
987 988
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
989
  return taosCacheGetNumOfObj(pMgmt->connCache);
D
dapan1121 已提交
990
}