mndProfile.c 32.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndProfile.h"
18
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
19
#include "mndDb.h"
S
Shengliang Guan 已提交
20
#include "mndDnode.h"
S
Shengliang Guan 已提交
21
#include "mndMnode.h"
D
dapan1121 已提交
22
#include "mndQnode.h"
S
Shengliang Guan 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
S
Shengliang Guan 已提交
25
#include "mndUser.h"
S
Shengliang Guan 已提交
26
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
27
#include "version.h"
S
Shengliang Guan 已提交
28

S
Shengliang Guan 已提交
29
typedef struct {
L
Liu Jicong 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
43
  SRWLatch queryLock;
S
Shengliang Guan 已提交
44
  SArray  *pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
45 46
} SConnObj;

D
dapan1121 已提交
47 48 49 50 51 52 53 54 55 56
typedef struct {
  int64_t            appId;
  uint32_t           ip;
  int32_t            pid;
  char               name[TSDB_APP_NAME_LEN];
  int64_t            startTime;
  SAppClusterSummary summary;
  int64_t            lastAccessTimeMs;
} SAppObj;

L
Liu Jicong 已提交
57 58
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
59
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
60
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
61
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
S
Shengliang Guan 已提交
62
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
63
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
64 65 66 67
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
68 69
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
70
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
D
dapan1121 已提交
71 72 73
static void      mndFreeApp(SAppObj *pApp);
static int32_t   mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void      mndCancelGetNextApp(SMnode *pMnode, void *pIter);
D
dapan1121 已提交
74
static int32_t   mndProcessSvrVerReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
75 76 77 78

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
79
  // in ms
D
dapan1121 已提交
80
  int32_t checkTime = tsShellActivityTimer * 2 * 1000;
D
dapan1121 已提交
81
  pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
D
dapan1121 已提交
82 83 84 85 86 87 88 89
  if (pMgmt->connCache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

  pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
  if (pMgmt->appCache == NULL) {
S
Shengliang Guan 已提交
90 91 92 93 94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
95 96 97 98
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
D
dapan1121 已提交
99
  mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
S
Shengliang Guan 已提交
100

101
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
102
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
D
dapan1121 已提交
103
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
104
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
D
dapan1121 已提交
105 106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
107

S
Shengliang Guan 已提交
108 109 110 111 112
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
113 114 115 116 117 118 119 120
  if (pMgmt->connCache != NULL) {
    taosCacheCleanup(pMgmt->connCache);
    pMgmt->connCache = NULL;
  }

  if (pMgmt->appCache != NULL) {
    taosCacheCleanup(pMgmt->appCache);
    pMgmt->appCache = NULL;
S
Shengliang Guan 已提交
121 122 123
  }
}

L
Liu Jicong 已提交
124 125
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
126 127
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
128 129
  char     connStr[255] = {0};
  int32_t  len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
D
dapan1121 已提交
130
  uint32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
131
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
132

S
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146
  SConnObj connObj = {
      .id = connId,
      .connType = connType,
      .appStartTimeMs = startTime,
      .pid = pid,
      .ip = ip,
      .port = port,
      .killed = 0,
      .loginTimeMs = taosGetTimestampMs(),
      .lastAccessTimeMs = 0,
      .killId = 0,
      .numOfQueries = 0,
      .pQueries = NULL,
  };
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
149
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
150 151
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
152
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
153 154
  SConnObj *pConn =
      taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
155 156
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
157
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
158 159
    return NULL;
  } else {
D
dapan1121 已提交
160
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
161 162
    return pConn;
  }
S
Shengliang Guan 已提交
163 164 165
}

static void mndFreeConn(SConnObj *pConn) {
166
  taosWLockLatch(&pConn->queryLock);
D
dapan1121 已提交
167
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
168
  taosWUnLockLatch(&pConn->queryLock);
169

D
dapan1121 已提交
170
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
171 172
}

D
dapan1121 已提交
173
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
174 175
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
176
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
177
  if (pConn == NULL) {
D
dapan1121 已提交
178
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
179 180 181
    return NULL;
  }

D
dapan1121 已提交
182
  pConn->lastAccessTimeMs = taosGetTimestampMs();
D
dapan1121 已提交
183
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
184 185 186 187 188
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
189
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
190

S
Shengliang Guan 已提交
191
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
192
  taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
S
Shengliang Guan 已提交
193 194
}

H
Haojun Liao 已提交
195
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
196 197
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
198 199 200 201 202
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
203 204
  }

H
Haojun Liao 已提交
205
  return pConn;
S
Shengliang Guan 已提交
206 207 208
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
209 210 211
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
212 213
}

S
Shengliang Guan 已提交
214
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
215 216 217 218 219 220
  SMnode         *pMnode = pReq->info.node;
  SUserObj       *pUser = NULL;
  SDbObj         *pDb = NULL;
  SConnObj       *pConn = NULL;
  int32_t         code = -1;
  SConnectReq     connReq = {0};
S
Shengliang Guan 已提交
221
  char            ip[24] = {0};
S
Shengliang Guan 已提交
222
  const STraceId *trace = &pReq->info.traceId;
S
Shengliang Guan 已提交
223

S
Shengliang Guan 已提交
224
  if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
S
Shengliang Guan 已提交
225
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
226
    goto _OVER;
S
Shengliang Guan 已提交
227
  }
S
Shengliang Guan 已提交
228

229
  taosIp2String(pReq->info.conn.clientIp, ip);
230 231 232 233
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT) != 0) {
    mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, terrstr());
    goto _OVER;
  }
S
Shengliang Guan 已提交
234

235
  pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
236
  if (pUser == NULL) {
S
Shengliang Guan 已提交
237 238
    mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, terrstr());
    goto _OVER;
239
  }
S
Shengliang Guan 已提交
240 241 242

  if (strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1) != 0) {
    mGError("user:%s, failed to login from %s since invalid pass, input:%s", pReq->info.conn.user, ip, connReq.passwd);
dengyihao's avatar
dengyihao 已提交
243
    code = TSDB_CODE_RPC_AUTH_FAILURE;
S
Shengliang Guan 已提交
244 245 246
    goto _OVER;
  }

S
Shengliang Guan 已提交
247
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
248
    char db[TSDB_DB_FNAME_LEN] = {0};
S
Shengliang Guan 已提交
249 250
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
251 252
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
253 254
      mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
              terrstr());
S
Shengliang Guan 已提交
255
      goto _OVER;
S
Shengliang Guan 已提交
256 257 258
    }
  }

259 260
  pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
                        pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
261
  if (pConn == NULL) {
S
Shengliang Guan 已提交
262
    mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
S
Shengliang Guan 已提交
263
    goto _OVER;
S
Shengliang Guan 已提交
264 265
  }

S
Shengliang Guan 已提交
266 267 268 269 270
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
271
  connectRsp.connType = connReq.connType;
D
dapan 已提交
272
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
273

D
dapan1121 已提交
274 275
  strcpy(connectRsp.sVer, version);
  snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
S
Shengliang Guan 已提交
276 277
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
278

S
Shengliang Guan 已提交
279
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
S
Shengliang Guan 已提交
280
  if (contLen < 0) goto _OVER;
S
Shengliang Guan 已提交
281
  void *pRsp = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
282
  if (pRsp == NULL) goto _OVER;
S
Shengliang Guan 已提交
283
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
284

S
Shengliang Guan 已提交
285 286
  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;
287

S
Shengliang Guan 已提交
288
  mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
289 290 291

  code = 0;

S
Shengliang Guan 已提交
292
_OVER:
293 294 295 296 297 298

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
299 300
}

D
dapan1121 已提交
301
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
302 303
  taosWLockLatch(&pConn->queryLock);

D
dapan1121 已提交
304
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
305

D
dapan1121 已提交
306
  pConn->pQueries = pBasic->queryDesc;
307
  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
D
dapan1121 已提交
308
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
309

310 311 312
  mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);

  taosWUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
313 314 315 316

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
317
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) {
D
dapan1121 已提交
318 319 320 321 322 323 324 325 326 327 328
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SAppObj app;
  app.appId = pReq->appId;
  app.ip = clientIp;
  app.pid = pReq->pid;
  strcpy(app.name, pReq->name);
  app.startTime = pReq->startTime;
  memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
  app.lastAccessTimeMs = taosGetTimestampMs();

S
Shengliang Guan 已提交
329
  const int32_t keepTime = tsShellActivityTimer * 3;
D
dapan1121 已提交
330 331 332 333 334 335
  SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
  if (pApp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
    return NULL;
  }
S
Shengliang Guan 已提交
336

D
dapan1121 已提交
337 338 339 340
  mTrace("app %" PRIx64 " is put into cache", pReq->appId);
  return pApp;
}

S
Shengliang Guan 已提交
341
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
D
dapan1121 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367

static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
  if (pApp == NULL) {
    mDebug("app %" PRIx64 " not in cache", appId);
    return NULL;
  }

  pApp->lastAccessTimeMs = (uint64_t)taosGetTimestampMs();

  mTrace("app %" PRIx64 " acquired from cache", appId);
  return pApp;
}

static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
  if (pApp == NULL) return;
  mTrace("release app %" PRIx64 " to cache", pApp->appId);

  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
}

void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
  SAppObj *pApp = NULL;
S
Shengliang Guan 已提交
368
  bool     hasNext = taosCacheIterNext(pIter);
D
dapan1121 已提交
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
  if (hasNext) {
    size_t dataLen = 0;
    pApp = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
  }

  return pApp;
}

static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
}

S
Shengliang Guan 已提交
385
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
386
#if 0
wafwerar's avatar
wafwerar 已提交
387
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
403
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
436
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
437 438 439 440 441 442 443 444 445
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
446 447
#endif
  return NULL;
L
Liu Jicong 已提交
448 449
}

D
dapan1121 已提交
450
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
S
Shengliang Guan 已提交
451 452
  SAppHbReq *pReq = &pHbReq->app;
  SAppObj   *pApp = mndAcquireApp(pMnode, pReq->appId);
D
dapan1121 已提交
453 454 455 456 457 458
  if (pApp == NULL) {
    pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
    if (pApp == NULL) {
      mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
      return -1;
    } else {
S
Shengliang Guan 已提交
459
      mDebug("a new app %" PRIx64 " is created", pReq->appId);
S
Shengliang Guan 已提交
460
      mndReleaseApp(pMnode, pApp);
D
dapan1121 已提交
461 462 463 464 465 466 467 468 469 470 471
      return TSDB_CODE_SUCCESS;
    }
  }

  memcpy(&pApp->summary, &pReq->summary, sizeof(pReq->summary));

  mndReleaseApp(pMnode, pApp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
472 473 474 475 476
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
  SSdb      *pSdb = pMnode->pSdb;
  SDnodeObj *pDnode = NULL;
  int64_t    curMs = taosGetTimestampMs();
  void      *pIter = NULL;
477

D
dapan1121 已提交
478 479 480
  while (true) {
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;
481

D
dapan1121 已提交
482 483 484 485
    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (online) {
      (*num)++;
    }
486

D
dapan1121 已提交
487 488 489 490 491 492
    sdbRelease(pSdb, pDnode);
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
493 494
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
495
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
496
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
S
Shengliang Guan 已提交
497
  SRpcConnInfo  connInfo = pMsg->info.conn;
D
dapan1121 已提交
498 499

  mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
D
dapan1121 已提交
500 501 502 503 504

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
505 506
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
D
dapan1121 已提交
507
                            pHbReq->app.pid, pHbReq->app.name, 0);
D
dapan1121 已提交
508 509 510 511
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
D
dapan1121 已提交
512
        mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
D
dapan1121 已提交
513 514
      }
    }
515

D
dapan1121 已提交
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
D
dapan1121 已提交
535
    rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
D
dapan1121 已提交
536
    mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes);
D
dapan1121 已提交
537
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
D
dapan1121 已提交
538 539

    mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
540

D
dapan1121 已提交
541 542 543
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
544 545
  } else {
    mDebug("no query info in hb msg");
D
dapan1121 已提交
546 547 548 549
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
550
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
551 552 553 554 555 556 557
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
558
    tFreeClientHbRsp(&hbRsp);
D
dapan1121 已提交
559 560 561 562 563 564 565 566
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
567
      case HEARTBEAT_KEY_USER_AUTHINFO: {
S
Shengliang Guan 已提交
568
        void   *rspMsg = NULL;
D
dapan 已提交
569 570 571 572 573 574 575 576
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
577
      case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
578
        void   *rspMsg = NULL;
D
dapan1121 已提交
579 580 581 582 583 584 585 586 587
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
588
        void   *rspMsg = NULL;
D
dapan1121 已提交
589
        int32_t rspLen = 0;
D
dapan1121 已提交
590
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
611 612
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
613

L
Liu Jicong 已提交
614
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
615
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
D
dapan1121 已提交
616
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
S
Shengliang Guan 已提交
617 618 619 620
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
621 622
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
623

S
Shengliang Guan 已提交
624
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
625
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
626
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
627
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
S
Shengliang Guan 已提交
628
      mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
D
dapan1121 已提交
629
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
630 631 632
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
633
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
634
      }
L
Liu Jicong 已提交
635 636
    }
  }
S
Shengliang Guan 已提交
637
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
638

S
Shengliang Guan 已提交
639 640 641 642
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
643
  tFreeClientHbBatchRsp(&batchRsp);
S
Shengliang Guan 已提交
644 645
  pReq->info.rspLen = tlen;
  pReq->info.rsp = buf;
S
Shengliang Guan 已提交
646 647 648 649

  return 0;
}

S
Shengliang Guan 已提交
650 651
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
652 653
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
654
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
655
  if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
656 657 658 659
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

D
dapan1121 已提交
660
  mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
661 662 663 664
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
665
  int32_t  connId = 0;
D
dapan1121 已提交
666
  uint64_t queryId = 0;
S
Shengliang Guan 已提交
667
  char    *p = strchr(killReq.queryStrId, ':');
D
dapan1121 已提交
668 669 670 671 672 673 674 675
  if (NULL == p) {
    mError("invalid query id %s", killReq.queryStrId);
    terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
    return -1;
  }
  *p = 0;
  connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
  queryId = taosStr2UInt64(p + 1, NULL, 16);
676

D
dapan1121 已提交
677
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
678
  if (pConn == NULL) {
D
dapan1121 已提交
679
    mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
680 681 682
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
D
dapan1121 已提交
683
    mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
D
dapan1121 已提交
684
    pConn->killId = queryId;
D
dapan1121 已提交
685
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
686 687 688 689
    return 0;
  }
}

S
Shengliang Guan 已提交
690 691
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
692 693
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
694
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
695
  if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
696 697 698
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
699

700 701 702 703
  if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN) != 0) {
    return -1;
  }

D
dapan1121 已提交
704
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
705
  if (pConn == NULL) {
D
dapan1121 已提交
706
    mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
707 708 709
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
D
dapan1121 已提交
710
    mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
711
    pConn->killed = 1;
D
dapan1121 已提交
712
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
713 714 715 716
    return TSDB_CODE_SUCCESS;
  }
}

D
dapan1121 已提交
717
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
718
  int32_t       code = -1;
D
dapan1121 已提交
719 720
  SServerVerRsp rsp = {0};
  strcpy(rsp.ver, version);
721

D
dapan1121 已提交
722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
  int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
  if (contLen < 0) goto _over;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto _over;
  tSerializeSServerVerRsp(pRsp, contLen, &rsp);

  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;

  code = 0;

_over:

  return code;
}

738
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
739 740 741 742 743 744
  SMnode   *pMnode = pReq->info.node;
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  int32_t   cols = 0;
  SConnObj *pConn = NULL;

H
Haojun Liao 已提交
745 746
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
747
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
H
Haojun Liao 已提交
748 749
  }

S
Shengliang Guan 已提交
750
  while (numOfRows < rows) {
H
Haojun Liao 已提交
751
    pConn = mndGetNextConn(pMnode, pShow->pIter);
752 753 754 755
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
756 757

    cols = 0;
758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);

    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(user, pConn->user);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)user, false);

    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
    STR_TO_VARSTR(app, pConn->app);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)app, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);

    char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
    varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
S
Shengliang Guan 已提交
786 787 788 789

    numOfRows++;
  }

790
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
791 792 793
  return numOfRows;
}

D
dapan1121 已提交
794
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
795 796 797 798 799 800
  SMnode   *pMnode = pReq->info.node;
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  int32_t   cols = 0;
  SConnObj *pConn = NULL;

H
Haojun Liao 已提交
801 802
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
803
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
H
Haojun Liao 已提交
804 805
  }

S
Shengliang Guan 已提交
806
  while (numOfRows < rows) {
H
Haojun Liao 已提交
807
    pConn = mndGetNextConn(pMnode, pShow->pIter);
808 809 810 811
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
812

813
    taosRLockLatch(&pConn->queryLock);
D
dapan1121 已提交
814
    if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) {
815
      taosRUnLockLatch(&pConn->queryLock);
D
dapan1121 已提交
816
      continue;
S
Shengliang Guan 已提交
817 818
    }

D
dapan1121 已提交
819 820
    int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
    for (int32_t i = 0; i < numOfQueries; ++i) {
821
      SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
S
Shengliang Guan 已提交
822 823
      cols = 0;

824 825 826
      char queryId[26 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
      varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
827
      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
828 829 830
      colDataAppend(pColInfo, numOfRows, (const char *)queryId, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
831
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false);
S
Shengliang Guan 已提交
832

D
dapan1121 已提交
833
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
834
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
S
Shengliang Guan 已提交
835

836 837 838 839 840 841
      char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
      STR_TO_VARSTR(app, pConn->app);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)app, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
842
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);
843

D
dapan1121 已提交
844
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
845
      STR_TO_VARSTR(user, pConn->user);
D
dapan1121 已提交
846
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
847
      colDataAppend(pColInfo, numOfRows, (const char *)user, false);
S
Shengliang Guan 已提交
848

D
dapan1121 已提交
849 850 851
      char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
      varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
852
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
853
      colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
S
Shengliang Guan 已提交
854

D
dapan1121 已提交
855
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
856
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false);
S
Shengliang Guan 已提交
857

D
dapan1121 已提交
858
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
859
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false);
S
Shengliang Guan 已提交
860

861 862
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
S
Shengliang Guan 已提交
863

D
dapan1121 已提交
864
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
865 866
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);

867
      char    subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
868 869 870 871 872 873
      int32_t strSize = sizeof(subStatus);
      int32_t offset = VARSTR_HEADER_SIZE;
      for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
        if (i) {
          offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
        }
874
        SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
875 876 877 878 879 880 881 882 883 884
        offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
      }
      varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, subStatus, false);

      char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
      STR_TO_VARSTR(sql, pQuery->sql);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
S
Shengliang Guan 已提交
885 886 887

      numOfRows++;
    }
888

889
    taosRUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
890 891
  }

892
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
893 894 895
  return numOfRows;
}

D
dapan1121 已提交
896
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
S
Shengliang Guan 已提交
897 898 899 900 901 902
  SMnode  *pMnode = pReq->info.node;
  SSdb    *pSdb = pMnode->pSdb;
  int32_t  numOfRows = 0;
  int32_t  cols = 0;
  SAppObj *pApp = NULL;

D
dapan1121 已提交
903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
  }

  while (numOfRows < rows) {
    pApp = mndGetNextApp(pMnode, pShow->pIter);
    if (pApp == NULL) {
      pShow->pIter = NULL;
      break;
    }

    cols = 0;

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->appId, false);

D
dapan1121 已提交
920 921 922
    char ip[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&ip[VARSTR_HEADER_SIZE], "%s", taosIpStr(pApp->ip));
    varDataLen(ip) = strlen(&ip[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
923
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
924
    colDataAppend(pColInfo, numOfRows, (const char *)ip, false);
D
dapan1121 已提交
925 926 927 928

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->pid, false);

D
dapan1121 已提交
929 930 931
    char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&name[VARSTR_HEADER_SIZE], "%s", pApp->name);
    varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
932
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
933
    colDataAppend(pColInfo, numOfRows, (const char *)name, false);
D
dapan1121 已提交
934 935

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
936
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->startTime, false);
D
dapan1121 已提交
937 938

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
939
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertsReq, false);
D
dapan1121 已提交
940 941

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
942
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertRows, false);
D
dapan1121 已提交
943 944

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
945
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.insertElapsedTime, false);
D
dapan1121 已提交
946 947

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
948
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.insertBytes, false);
D
dapan1121 已提交
949 950

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
951
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.fetchBytes, false);
D
dapan1121 已提交
952 953

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
954
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.queryElapsedTime, false);
D
dapan1121 已提交
955 956

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
957
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfSlowQueries, false);
D
dapan1121 已提交
958 959

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
960
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.totalRequests, false);
D
dapan1121 已提交
961 962

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
963
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.currentRequests, false);
D
dapan1121 已提交
964 965

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
966
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->lastAccessTimeMs, false);
D
dapan1121 已提交
967 968 969 970 971 972 973 974

    numOfRows++;
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

S
Shengliang Guan 已提交
975
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
976 977 978
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
979
}
S
Shengliang Guan 已提交
980

S
Shengliang Guan 已提交
981 982
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
983
  return taosCacheGetNumOfObj(pMgmt->connCache);
D
dapan1121 已提交
984
}