mndProfile.c 33.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndProfile.h"
18
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
19
#include "mndDb.h"
S
Shengliang Guan 已提交
20
#include "mndDnode.h"
S
Shengliang Guan 已提交
21
#include "mndMnode.h"
D
dapan1121 已提交
22
#include "mndQnode.h"
S
Shengliang Guan 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
S
Shengliang Guan 已提交
25
#include "mndUser.h"
S
Shengliang Guan 已提交
26
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
27
#include "version.h"
S
Shengliang Guan 已提交
28

S
Shengliang Guan 已提交
29
typedef struct {
L
Liu Jicong 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
43
  SRWLatch queryLock;
S
Shengliang Guan 已提交
44
  SArray  *pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
45 46
} SConnObj;

D
dapan1121 已提交
47 48 49 50 51 52 53 54 55 56
typedef struct {
  int64_t            appId;
  uint32_t           ip;
  int32_t            pid;
  char               name[TSDB_APP_NAME_LEN];
  int64_t            startTime;
  SAppClusterSummary summary;
  int64_t            lastAccessTimeMs;
} SAppObj;

L
Liu Jicong 已提交
57 58
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
59
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
60
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
61
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
S
Shengliang Guan 已提交
62
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
63
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
64 65 66 67
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
68 69
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
70
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
D
dapan1121 已提交
71 72 73
static void      mndFreeApp(SAppObj *pApp);
static int32_t   mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void      mndCancelGetNextApp(SMnode *pMnode, void *pIter);
D
dapan1121 已提交
74
static int32_t   mndProcessSvrVerReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
75 76 77 78

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
79
  // in ms
D
dapan1121 已提交
80
  int32_t checkTime = tsShellActivityTimer * 2 * 1000;
D
dapan1121 已提交
81
  pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
D
dapan1121 已提交
82 83 84 85 86 87 88 89
  if (pMgmt->connCache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

  pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
  if (pMgmt->appCache == NULL) {
S
Shengliang Guan 已提交
90 91 92 93 94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
95 96 97 98
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
D
dapan1121 已提交
99
  mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
S
Shengliang Guan 已提交
100

101
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
102
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
D
dapan1121 已提交
103
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
104
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
D
dapan1121 已提交
105 106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
107

S
Shengliang Guan 已提交
108 109 110 111 112
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
113 114 115 116 117 118 119 120
  if (pMgmt->connCache != NULL) {
    taosCacheCleanup(pMgmt->connCache);
    pMgmt->connCache = NULL;
  }

  if (pMgmt->appCache != NULL) {
    taosCacheCleanup(pMgmt->appCache);
    pMgmt->appCache = NULL;
S
Shengliang Guan 已提交
121 122 123
  }
}

L
Liu Jicong 已提交
124 125
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
126 127
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
128 129
  char     connStr[255] = {0};
  int32_t  len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
D
dapan1121 已提交
130
  uint32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
131
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
132

S
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146
  SConnObj connObj = {
      .id = connId,
      .connType = connType,
      .appStartTimeMs = startTime,
      .pid = pid,
      .ip = ip,
      .port = port,
      .killed = 0,
      .loginTimeMs = taosGetTimestampMs(),
      .lastAccessTimeMs = 0,
      .killId = 0,
      .numOfQueries = 0,
      .pQueries = NULL,
  };
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
149
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
150 151
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
152
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
153 154
  SConnObj *pConn =
      taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
155 156
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
157
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
158 159
    return NULL;
  } else {
D
dapan1121 已提交
160
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
161 162
    return pConn;
  }
S
Shengliang Guan 已提交
163 164 165
}

static void mndFreeConn(SConnObj *pConn) {
166
  taosWLockLatch(&pConn->queryLock);
D
dapan1121 已提交
167
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
168
  taosWUnLockLatch(&pConn->queryLock);
169

D
dapan1121 已提交
170
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
171 172
}

D
dapan1121 已提交
173
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
174 175
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
176
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
177
  if (pConn == NULL) {
D
dapan1121 已提交
178
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
179 180 181
    return NULL;
  }

D
dapan1121 已提交
182
  pConn->lastAccessTimeMs = taosGetTimestampMs();
D
dapan1121 已提交
183
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
184 185 186 187 188
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
189
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
190

S
Shengliang Guan 已提交
191
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
192
  taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
S
Shengliang Guan 已提交
193 194
}

H
Haojun Liao 已提交
195
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
196 197
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
198 199 200 201 202
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
203 204
  }

H
Haojun Liao 已提交
205
  return pConn;
S
Shengliang Guan 已提交
206 207 208
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
209 210 211
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
212 213
}

S
Shengliang Guan 已提交
214
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
215 216 217 218 219 220
  SMnode         *pMnode = pReq->info.node;
  SUserObj       *pUser = NULL;
  SDbObj         *pDb = NULL;
  SConnObj       *pConn = NULL;
  int32_t         code = -1;
  SConnectReq     connReq = {0};
S
Shengliang Guan 已提交
221
  char            ip[24] = {0};
S
Shengliang Guan 已提交
222
  const STraceId *trace = &pReq->info.traceId;
S
Shengliang Guan 已提交
223

S
Shengliang Guan 已提交
224
  if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
S
Shengliang Guan 已提交
225
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
226
    goto _OVER;
S
Shengliang Guan 已提交
227
  }
S
Shengliang Guan 已提交
228

229
  taosIp2String(pReq->info.conn.clientIp, ip);
230 231 232 233
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT) != 0) {
    mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, terrstr());
    goto _OVER;
  }
S
Shengliang Guan 已提交
234

235
  pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
236
  if (pUser == NULL) {
S
Shengliang Guan 已提交
237 238
    mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, terrstr());
    goto _OVER;
239
  }
S
Shengliang Guan 已提交
240 241 242

  if (strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1) != 0) {
    mGError("user:%s, failed to login from %s since invalid pass, input:%s", pReq->info.conn.user, ip, connReq.passwd);
dengyihao's avatar
dengyihao 已提交
243
    code = TSDB_CODE_RPC_AUTH_FAILURE;
S
Shengliang Guan 已提交
244 245 246
    goto _OVER;
  }

S
Shengliang Guan 已提交
247
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
248
    char db[TSDB_DB_FNAME_LEN] = {0};
S
Shengliang Guan 已提交
249 250
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
251 252
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
253 254
      mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
              terrstr());
255 256 257 258
      goto _OVER;
    }

    if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
S
Shengliang Guan 已提交
259
      goto _OVER;
S
Shengliang Guan 已提交
260 261 262
    }
  }

263 264
  pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
                        pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
265
  if (pConn == NULL) {
S
Shengliang Guan 已提交
266
    mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
S
Shengliang Guan 已提交
267
    goto _OVER;
S
Shengliang Guan 已提交
268 269
  }

S
Shengliang Guan 已提交
270 271 272 273 274
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
275
  connectRsp.connType = connReq.connType;
D
dapan 已提交
276
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
277

D
dapan1121 已提交
278 279
  strcpy(connectRsp.sVer, version);
  snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
S
Shengliang Guan 已提交
280 281
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
282

S
Shengliang Guan 已提交
283
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
S
Shengliang Guan 已提交
284
  if (contLen < 0) goto _OVER;
S
Shengliang Guan 已提交
285
  void *pRsp = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
286
  if (pRsp == NULL) goto _OVER;
S
Shengliang Guan 已提交
287
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
288

S
Shengliang Guan 已提交
289 290
  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;
291

S
Shengliang Guan 已提交
292
  mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
293 294 295

  code = 0;

S
Shengliang Guan 已提交
296
_OVER:
297 298 299 300 301 302

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
303 304
}

D
dapan1121 已提交
305
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
306 307
  taosWLockLatch(&pConn->queryLock);

D
dapan1121 已提交
308
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
309

D
dapan1121 已提交
310
  pConn->pQueries = pBasic->queryDesc;
311
  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
D
dapan1121 已提交
312
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
313

314 315 316
  mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);

  taosWUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
317 318 319 320

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
321
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) {
D
dapan1121 已提交
322 323 324 325 326 327 328 329 330 331 332
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SAppObj app;
  app.appId = pReq->appId;
  app.ip = clientIp;
  app.pid = pReq->pid;
  strcpy(app.name, pReq->name);
  app.startTime = pReq->startTime;
  memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
  app.lastAccessTimeMs = taosGetTimestampMs();

S
Shengliang Guan 已提交
333
  const int32_t keepTime = tsShellActivityTimer * 3;
D
dapan1121 已提交
334 335 336 337 338 339
  SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
  if (pApp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
    return NULL;
  }
S
Shengliang Guan 已提交
340

D
dapan1121 已提交
341 342 343 344
  mTrace("app %" PRIx64 " is put into cache", pReq->appId);
  return pApp;
}

S
Shengliang Guan 已提交
345
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
D
dapan1121 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371

static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
  if (pApp == NULL) {
    mDebug("app %" PRIx64 " not in cache", appId);
    return NULL;
  }

  pApp->lastAccessTimeMs = (uint64_t)taosGetTimestampMs();

  mTrace("app %" PRIx64 " acquired from cache", appId);
  return pApp;
}

static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
  if (pApp == NULL) return;
  mTrace("release app %" PRIx64 " to cache", pApp->appId);

  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
}

void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
  SAppObj *pApp = NULL;
S
Shengliang Guan 已提交
372
  bool     hasNext = taosCacheIterNext(pIter);
D
dapan1121 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
  if (hasNext) {
    size_t dataLen = 0;
    pApp = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
  }

  return pApp;
}

static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
}

S
Shengliang Guan 已提交
389
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
390
#if 0
wafwerar's avatar
wafwerar 已提交
391
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
407
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
440
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
441 442 443 444 445 446 447 448 449
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
450 451
#endif
  return NULL;
L
Liu Jicong 已提交
452 453
}

D
dapan1121 已提交
454
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
S
Shengliang Guan 已提交
455 456
  SAppHbReq *pReq = &pHbReq->app;
  SAppObj   *pApp = mndAcquireApp(pMnode, pReq->appId);
D
dapan1121 已提交
457 458 459 460 461 462
  if (pApp == NULL) {
    pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
    if (pApp == NULL) {
      mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
      return -1;
    } else {
S
Shengliang Guan 已提交
463
      mDebug("a new app %" PRIx64 " is created", pReq->appId);
S
Shengliang Guan 已提交
464
      mndReleaseApp(pMnode, pApp);
D
dapan1121 已提交
465 466 467 468 469 470 471 472 473 474 475
      return TSDB_CODE_SUCCESS;
    }
  }

  memcpy(&pApp->summary, &pReq->summary, sizeof(pReq->summary));

  mndReleaseApp(pMnode, pApp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
476 477 478 479 480
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
  SSdb      *pSdb = pMnode->pSdb;
  SDnodeObj *pDnode = NULL;
  int64_t    curMs = taosGetTimestampMs();
  void      *pIter = NULL;
481

D
dapan1121 已提交
482 483 484
  while (true) {
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;
485

D
dapan1121 已提交
486 487 488 489
    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (online) {
      (*num)++;
    }
490

D
dapan1121 已提交
491 492 493 494 495 496
    sdbRelease(pSdb, pDnode);
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
497 498
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
499
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
500
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
S
Shengliang Guan 已提交
501
  SRpcConnInfo  connInfo = pMsg->info.conn;
D
dapan1121 已提交
502 503

  mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
D
dapan1121 已提交
504 505 506 507 508

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
509 510
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
D
dapan1121 已提交
511
                            pHbReq->app.pid, pHbReq->app.name, 0);
D
dapan1121 已提交
512 513 514 515
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
D
dapan1121 已提交
516
        mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
D
dapan1121 已提交
517 518
      }
    }
519

D
dapan1121 已提交
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
D
dapan1121 已提交
539
    rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
D
dapan1121 已提交
540
    mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes);
D
dapan1121 已提交
541
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
D
dapan1121 已提交
542 543

    mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
544

D
dapan1121 已提交
545 546 547
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
548 549
  } else {
    mDebug("no query info in hb msg");
D
dapan1121 已提交
550 551 552 553
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
554
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
555 556 557 558 559 560 561
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
562
    tFreeClientHbRsp(&hbRsp);
D
dapan1121 已提交
563 564 565 566 567 568 569 570
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
571
      case HEARTBEAT_KEY_USER_AUTHINFO: {
S
Shengliang Guan 已提交
572
        void   *rspMsg = NULL;
D
dapan 已提交
573 574 575 576 577 578 579 580
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
581
      case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
582
        void   *rspMsg = NULL;
D
dapan1121 已提交
583 584 585 586 587 588 589 590 591
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
592
        void   *rspMsg = NULL;
D
dapan1121 已提交
593
        int32_t rspLen = 0;
D
dapan1121 已提交
594
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
615 616
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
617

L
Liu Jicong 已提交
618
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
619
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
D
dapan1121 已提交
620
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
S
Shengliang Guan 已提交
621 622 623 624
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
625 626
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
627

S
Shengliang Guan 已提交
628
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
629
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
630
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
631
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
S
Shengliang Guan 已提交
632
      mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
D
dapan1121 已提交
633
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
634 635 636
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
637
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
638
      }
L
Liu Jicong 已提交
639 640
    }
  }
S
Shengliang Guan 已提交
641
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
642

S
Shengliang Guan 已提交
643 644 645 646
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
647
  tFreeClientHbBatchRsp(&batchRsp);
S
Shengliang Guan 已提交
648 649
  pReq->info.rspLen = tlen;
  pReq->info.rsp = buf;
S
Shengliang Guan 已提交
650 651 652 653

  return 0;
}

S
Shengliang Guan 已提交
654 655
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
656 657
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
658
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
659
  if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
660 661 662 663
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

D
dapan1121 已提交
664
  mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
665 666 667 668
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
669
  int32_t  connId = 0;
D
dapan1121 已提交
670
  uint64_t queryId = 0;
S
Shengliang Guan 已提交
671
  char    *p = strchr(killReq.queryStrId, ':');
D
dapan1121 已提交
672 673 674 675 676 677 678 679
  if (NULL == p) {
    mError("invalid query id %s", killReq.queryStrId);
    terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
    return -1;
  }
  *p = 0;
  connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
  queryId = taosStr2UInt64(p + 1, NULL, 16);
680

D
dapan1121 已提交
681
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
682
  if (pConn == NULL) {
D
dapan1121 已提交
683
    mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
684 685 686
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
D
dapan1121 已提交
687
    mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
D
dapan1121 已提交
688
    pConn->killId = queryId;
D
dapan1121 已提交
689
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
690 691 692 693
    return 0;
  }
}

S
Shengliang Guan 已提交
694 695
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
696 697
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
698
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
699
  if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
700 701 702
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
703

704 705 706 707
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN) != 0) {
    return -1;
  }

D
dapan1121 已提交
708
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
709
  if (pConn == NULL) {
D
dapan1121 已提交
710
    mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
711 712 713
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
D
dapan1121 已提交
714
    mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
715
    pConn->killed = 1;
D
dapan1121 已提交
716
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
717 718 719 720
    return TSDB_CODE_SUCCESS;
  }
}

D
dapan1121 已提交
721
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
722
  int32_t       code = -1;
D
dapan1121 已提交
723 724
  SServerVerRsp rsp = {0};
  strcpy(rsp.ver, version);
725

D
dapan1121 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
  int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
  if (contLen < 0) goto _over;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto _over;
  tSerializeSServerVerRsp(pRsp, contLen, &rsp);

  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;

  code = 0;

_over:

  return code;
}

742
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
743 744 745 746 747 748
  SMnode   *pMnode = pReq->info.node;
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  int32_t   cols = 0;
  SConnObj *pConn = NULL;

H
Haojun Liao 已提交
749 750
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
751
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
H
Haojun Liao 已提交
752 753
  }

S
Shengliang Guan 已提交
754
  while (numOfRows < rows) {
H
Haojun Liao 已提交
755
    pConn = mndGetNextConn(pMnode, pShow->pIter);
756 757 758 759
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
760 761

    cols = 0;
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);

    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(user, pConn->user);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)user, false);

    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
    STR_TO_VARSTR(app, pConn->app);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)app, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);

    char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
    varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
S
Shengliang Guan 已提交
790 791 792 793

    numOfRows++;
  }

794
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
795 796 797
  return numOfRows;
}

D
dapan1121 已提交
798
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
799 800 801 802 803 804
  SMnode   *pMnode = pReq->info.node;
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  int32_t   cols = 0;
  SConnObj *pConn = NULL;

H
Haojun Liao 已提交
805 806
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
807
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
H
Haojun Liao 已提交
808 809
  }

S
Shengliang Guan 已提交
810
  while (numOfRows < rows) {
H
Haojun Liao 已提交
811
    pConn = mndGetNextConn(pMnode, pShow->pIter);
812 813 814 815
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
816

817
    taosRLockLatch(&pConn->queryLock);
D
dapan1121 已提交
818
    if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) {
819
      taosRUnLockLatch(&pConn->queryLock);
D
dapan1121 已提交
820
      continue;
S
Shengliang Guan 已提交
821 822
    }

D
dapan1121 已提交
823 824
    int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
    for (int32_t i = 0; i < numOfQueries; ++i) {
825
      SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
S
Shengliang Guan 已提交
826 827
      cols = 0;

828 829 830
      char queryId[26 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
      varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
831
      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
832 833 834
      colDataAppend(pColInfo, numOfRows, (const char *)queryId, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
835
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false);
S
Shengliang Guan 已提交
836

D
dapan1121 已提交
837
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
838
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
S
Shengliang Guan 已提交
839

840 841 842 843 844 845
      char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
      STR_TO_VARSTR(app, pConn->app);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)app, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
846
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);
847

D
dapan1121 已提交
848
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
849
      STR_TO_VARSTR(user, pConn->user);
D
dapan1121 已提交
850
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
851
      colDataAppend(pColInfo, numOfRows, (const char *)user, false);
S
Shengliang Guan 已提交
852

D
dapan1121 已提交
853 854 855
      char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
      varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
856
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
857
      colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
S
Shengliang Guan 已提交
858

D
dapan1121 已提交
859
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
860
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false);
S
Shengliang Guan 已提交
861

D
dapan1121 已提交
862
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
863
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false);
S
Shengliang Guan 已提交
864

865 866
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
S
Shengliang Guan 已提交
867

D
dapan1121 已提交
868
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
869 870
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);

871
      char    subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
872 873 874 875 876 877
      int32_t strSize = sizeof(subStatus);
      int32_t offset = VARSTR_HEADER_SIZE;
      for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
        if (i) {
          offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
        }
878
        SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
879 880 881 882 883 884 885 886 887 888
        offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
      }
      varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, subStatus, false);

      char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
      STR_TO_VARSTR(sql, pQuery->sql);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
S
Shengliang Guan 已提交
889 890 891

      numOfRows++;
    }
892

893
    taosRUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
894 895
  }

896
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
897 898 899
  return numOfRows;
}

D
dapan1121 已提交
900
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
S
Shengliang Guan 已提交
901 902 903 904 905 906
  SMnode  *pMnode = pReq->info.node;
  SSdb    *pSdb = pMnode->pSdb;
  int32_t  numOfRows = 0;
  int32_t  cols = 0;
  SAppObj *pApp = NULL;

D
dapan1121 已提交
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
  }

  while (numOfRows < rows) {
    pApp = mndGetNextApp(pMnode, pShow->pIter);
    if (pApp == NULL) {
      pShow->pIter = NULL;
      break;
    }

    cols = 0;

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->appId, false);

D
dapan1121 已提交
924 925 926
    char ip[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&ip[VARSTR_HEADER_SIZE], "%s", taosIpStr(pApp->ip));
    varDataLen(ip) = strlen(&ip[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
927
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
928
    colDataAppend(pColInfo, numOfRows, (const char *)ip, false);
D
dapan1121 已提交
929 930 931 932

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->pid, false);

D
dapan1121 已提交
933 934 935
    char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&name[VARSTR_HEADER_SIZE], "%s", pApp->name);
    varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
936
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
937
    colDataAppend(pColInfo, numOfRows, (const char *)name, false);
D
dapan1121 已提交
938 939

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
940
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->startTime, false);
D
dapan1121 已提交
941 942

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
943
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertsReq, false);
D
dapan1121 已提交
944 945

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
946
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertRows, false);
D
dapan1121 已提交
947 948

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
949
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.insertElapsedTime, false);
D
dapan1121 已提交
950 951

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
952
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.insertBytes, false);
D
dapan1121 已提交
953 954

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
955
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.fetchBytes, false);
D
dapan1121 已提交
956 957

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
958
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.queryElapsedTime, false);
D
dapan1121 已提交
959 960

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
961
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfSlowQueries, false);
D
dapan1121 已提交
962 963

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
964
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.totalRequests, false);
D
dapan1121 已提交
965 966

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
967
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.currentRequests, false);
D
dapan1121 已提交
968 969

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
970
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->lastAccessTimeMs, false);
D
dapan1121 已提交
971 972 973 974 975 976 977 978

    numOfRows++;
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

S
Shengliang Guan 已提交
979
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
980 981 982
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
983
}
S
Shengliang Guan 已提交
984

S
Shengliang Guan 已提交
985 986
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
987
  return taosCacheGetNumOfObj(pMgmt->connCache);
D
dapan1121 已提交
988
}