mndProfile.c 33.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndProfile.h"
S
Shengliang Guan 已提交
18
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndDb.h"
S
Shengliang Guan 已提交
20
#include "mndDnode.h"
S
Shengliang Guan 已提交
21
#include "mndMnode.h"
D
dapan1121 已提交
22
#include "mndQnode.h"
S
Shengliang Guan 已提交
23
#include "mndShow.h"
S
Shengliang Guan 已提交
24
#include "mndStb.h"
S
Shengliang Guan 已提交
25
#include "mndUser.h"
S
Shengliang Guan 已提交
26
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
27
#include "version.h"
S
Shengliang Guan 已提交
28

S
Shengliang Guan 已提交
29
typedef struct {
L
Liu Jicong 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
43
  SRWLatch queryLock;
S
Shengliang Guan 已提交
44
  SArray  *pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
45 46
} SConnObj;

D
dapan1121 已提交
47 48 49 50 51 52 53 54 55 56
typedef struct {
  int64_t            appId;
  uint32_t           ip;
  int32_t            pid;
  char               name[TSDB_APP_NAME_LEN];
  int64_t            startTime;
  SAppClusterSummary summary;
  int64_t            lastAccessTimeMs;
} SAppObj;

L
Liu Jicong 已提交
57 58
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
59
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
60
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
61
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
S
Shengliang Guan 已提交
62
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
63
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
64 65 66 67
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
68 69
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
70
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
D
dapan1121 已提交
71 72 73
static void      mndFreeApp(SAppObj *pApp);
static int32_t   mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void      mndCancelGetNextApp(SMnode *pMnode, void *pIter);
D
dapan1121 已提交
74
static int32_t   mndProcessSvrVerReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
75 76 77 78

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
79
  // in ms
D
dapan1121 已提交
80
  int32_t checkTime = tsShellActivityTimer * 2 * 1000;
D
dapan1121 已提交
81
  pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
D
dapan1121 已提交
82 83 84 85 86 87 88 89
  if (pMgmt->connCache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

  pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
  if (pMgmt->appCache == NULL) {
S
Shengliang Guan 已提交
90 91 92 93 94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
95 96 97 98
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
D
dapan1121 已提交
99
  mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
S
Shengliang Guan 已提交
100

101
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
102
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
D
dapan1121 已提交
103
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
104
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
D
dapan1121 已提交
105 106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
107

S
Shengliang Guan 已提交
108 109 110 111 112
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
113 114 115 116 117 118 119 120
  if (pMgmt->connCache != NULL) {
    taosCacheCleanup(pMgmt->connCache);
    pMgmt->connCache = NULL;
  }

  if (pMgmt->appCache != NULL) {
    taosCacheCleanup(pMgmt->appCache);
    pMgmt->appCache = NULL;
S
Shengliang Guan 已提交
121 122 123
  }
}

L
Liu Jicong 已提交
124 125
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
126 127
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
128 129
  char     connStr[255] = {0};
  int32_t  len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
D
dapan1121 已提交
130
  uint32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
131
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
132

S
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146
  SConnObj connObj = {
      .id = connId,
      .connType = connType,
      .appStartTimeMs = startTime,
      .pid = pid,
      .ip = ip,
      .port = port,
      .killed = 0,
      .loginTimeMs = taosGetTimestampMs(),
      .lastAccessTimeMs = 0,
      .killId = 0,
      .numOfQueries = 0,
      .pQueries = NULL,
  };
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
149
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
150 151
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
152
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
153 154
  SConnObj *pConn =
      taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
155 156
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
157
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
158 159
    return NULL;
  } else {
D
dapan1121 已提交
160
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
161 162
    return pConn;
  }
S
Shengliang Guan 已提交
163 164 165
}

static void mndFreeConn(SConnObj *pConn) {
166
  taosWLockLatch(&pConn->queryLock);
D
dapan1121 已提交
167
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
168
  taosWUnLockLatch(&pConn->queryLock);
169

D
dapan1121 已提交
170
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
171 172
}

D
dapan1121 已提交
173
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
174 175
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
176
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
177
  if (pConn == NULL) {
D
dapan1121 已提交
178
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
179 180 181
    return NULL;
  }

D
dapan1121 已提交
182
  pConn->lastAccessTimeMs = taosGetTimestampMs();
D
dapan1121 已提交
183
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
184 185 186 187 188
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
189
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
190

S
Shengliang Guan 已提交
191
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
192
  taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
S
Shengliang Guan 已提交
193 194
}

H
Haojun Liao 已提交
195
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
196 197
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
198 199 200 201 202
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
203 204
  }

H
Haojun Liao 已提交
205
  return pConn;
S
Shengliang Guan 已提交
206 207 208
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
209 210 211
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
212 213
}

S
Shengliang Guan 已提交
214
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
215 216 217 218 219 220
  SMnode         *pMnode = pReq->info.node;
  SUserObj       *pUser = NULL;
  SDbObj         *pDb = NULL;
  SConnObj       *pConn = NULL;
  int32_t         code = -1;
  SConnectReq     connReq = {0};
S
Shengliang Guan 已提交
221
  char            ip[24] = {0};
S
Shengliang Guan 已提交
222
  const STraceId *trace = &pReq->info.traceId;
S
Shengliang Guan 已提交
223

S
Shengliang Guan 已提交
224
  if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
S
Shengliang Guan 已提交
225
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
226
    goto _OVER;
S
Shengliang Guan 已提交
227
  }
S
Shengliang Guan 已提交
228

229
  taosIp2String(pReq->info.conn.clientIp, ip);
S
Shengliang Guan 已提交
230

231
  pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
232
  if (pUser == NULL) {
S
Shengliang Guan 已提交
233 234
    mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, terrstr());
    goto _OVER;
235
  }
S
Shengliang Guan 已提交
236 237 238

  if (strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1) != 0) {
    mGError("user:%s, failed to login from %s since invalid pass, input:%s", pReq->info.conn.user, ip, connReq.passwd);
dengyihao's avatar
dengyihao 已提交
239
    code = TSDB_CODE_RPC_AUTH_FAILURE;
S
Shengliang Guan 已提交
240 241 242 243 244 245
    goto _OVER;
  }

  if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CONNECT) != 0) {
    mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, terrstr());
    goto _OVER;
dengyihao's avatar
dengyihao 已提交
246
  }
247

S
Shengliang Guan 已提交
248
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
249
    char db[TSDB_DB_FNAME_LEN] = {0};
S
Shengliang Guan 已提交
250 251
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
252 253
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
254 255
      mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
              terrstr());
S
Shengliang Guan 已提交
256
      goto _OVER;
S
Shengliang Guan 已提交
257 258 259
    }
  }

260 261
  pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
                        pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
262
  if (pConn == NULL) {
S
Shengliang Guan 已提交
263
    mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
S
Shengliang Guan 已提交
264
    goto _OVER;
S
Shengliang Guan 已提交
265 266
  }

S
Shengliang Guan 已提交
267 268 269 270 271
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
272
  connectRsp.connType = connReq.connType;
D
dapan 已提交
273
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
D
dapan1121 已提交
274 275 276
  
  strcpy(connectRsp.sVer, version);
  snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
S
Shengliang Guan 已提交
277 278
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
279

S
Shengliang Guan 已提交
280
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
S
Shengliang Guan 已提交
281
  if (contLen < 0) goto _OVER;
S
Shengliang Guan 已提交
282
  void *pRsp = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
283
  if (pRsp == NULL) goto _OVER;
S
Shengliang Guan 已提交
284
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
285

S
Shengliang Guan 已提交
286 287
  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;
288

S
Shengliang Guan 已提交
289
  mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
290 291 292

  code = 0;

S
Shengliang Guan 已提交
293
_OVER:
294 295 296 297 298 299

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
300 301
}

D
dapan1121 已提交
302
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
303 304
  taosWLockLatch(&pConn->queryLock);

D
dapan1121 已提交
305
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
306

D
dapan1121 已提交
307
  pConn->pQueries = pBasic->queryDesc;
308
  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
D
dapan1121 已提交
309
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
310

311 312 313
  mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);

  taosWUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
314 315 316 317

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
318
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) {
D
dapan1121 已提交
319 320 321 322 323 324 325 326 327 328 329
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SAppObj app;
  app.appId = pReq->appId;
  app.ip = clientIp;
  app.pid = pReq->pid;
  strcpy(app.name, pReq->name);
  app.startTime = pReq->startTime;
  memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
  app.lastAccessTimeMs = taosGetTimestampMs();

S
Shengliang Guan 已提交
330
  const int32_t keepTime = tsShellActivityTimer * 3;
D
dapan1121 已提交
331 332 333 334 335 336
  SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
  if (pApp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
    return NULL;
  }
S
Shengliang Guan 已提交
337

D
dapan1121 已提交
338 339 340 341
  mTrace("app %" PRIx64 " is put into cache", pReq->appId);
  return pApp;
}

S
Shengliang Guan 已提交
342
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
D
dapan1121 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368

static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
  if (pApp == NULL) {
    mDebug("app %" PRIx64 " not in cache", appId);
    return NULL;
  }

  pApp->lastAccessTimeMs = (uint64_t)taosGetTimestampMs();

  mTrace("app %" PRIx64 " acquired from cache", appId);
  return pApp;
}

static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
  if (pApp == NULL) return;
  mTrace("release app %" PRIx64 " to cache", pApp->appId);

  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
}

void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
  SAppObj *pApp = NULL;
S
Shengliang Guan 已提交
369
  bool     hasNext = taosCacheIterNext(pIter);
D
dapan1121 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
  if (hasNext) {
    size_t dataLen = 0;
    pApp = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
  }

  return pApp;
}

static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
}

S
Shengliang Guan 已提交
386
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
387
#if 0
wafwerar's avatar
wafwerar 已提交
388
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
404
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
437
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
438 439 440 441 442 443 444 445 446
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
447 448
#endif
  return NULL;
L
Liu Jicong 已提交
449 450
}

D
dapan1121 已提交
451
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
S
Shengliang Guan 已提交
452 453
  SAppHbReq *pReq = &pHbReq->app;
  SAppObj   *pApp = mndAcquireApp(pMnode, pReq->appId);
D
dapan1121 已提交
454 455 456 457 458 459
  if (pApp == NULL) {
    pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
    if (pApp == NULL) {
      mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
      return -1;
    } else {
S
Shengliang Guan 已提交
460
      mDebug("a new app %" PRIx64 " is created", pReq->appId);
S
Shengliang Guan 已提交
461
      mndReleaseApp(pMnode, pApp);
D
dapan1121 已提交
462 463 464 465 466 467 468 469 470 471 472
      return TSDB_CODE_SUCCESS;
    }
  }

  memcpy(&pApp->summary, &pReq->summary, sizeof(pReq->summary));

  mndReleaseApp(pMnode, pApp);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
  SSdb      *pSdb = pMnode->pSdb;
  SDnodeObj *pDnode = NULL;
  int64_t    curMs = taosGetTimestampMs();
  void      *pIter = NULL;
  
  while (true) {
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;
    
    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (online) {
      (*num)++;
    }
    
    sdbRelease(pSdb, pDnode);
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
494 495
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
496
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
497
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
S
Shengliang Guan 已提交
498
  SRpcConnInfo  connInfo = pMsg->info.conn;
D
dapan1121 已提交
499 500

  mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
D
dapan1121 已提交
501 502 503 504 505

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
506 507
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
D
dapan1121 已提交
508
                            pHbReq->app.pid, pHbReq->app.name, 0);
D
dapan1121 已提交
509 510 511 512
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
D
dapan1121 已提交
513
        mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
D
dapan1121 已提交
514 515
      }
    }
516

D
dapan1121 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
D
dapan1121 已提交
536
    rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
D
dapan1121 已提交
537
    mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes);
D
dapan1121 已提交
538
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
D
dapan1121 已提交
539 540

    mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
541

D
dapan1121 已提交
542 543 544
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
545 546
  } else {
    mDebug("no query info in hb msg");
D
dapan1121 已提交
547 548 549 550
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
551
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
552 553 554 555 556 557 558
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
559
    tFreeClientHbRsp(&hbRsp);
D
dapan1121 已提交
560 561 562 563 564 565 566 567
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
568
      case HEARTBEAT_KEY_USER_AUTHINFO: {
S
Shengliang Guan 已提交
569
        void   *rspMsg = NULL;
D
dapan 已提交
570 571 572 573 574 575 576 577
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
578
      case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
579
        void   *rspMsg = NULL;
D
dapan1121 已提交
580 581 582 583 584 585 586 587 588
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
589
        void   *rspMsg = NULL;
D
dapan1121 已提交
590
        int32_t rspLen = 0;
D
dapan1121 已提交
591
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
612 613
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
614

L
Liu Jicong 已提交
615
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
616
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
D
dapan1121 已提交
617
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
S
Shengliang Guan 已提交
618 619 620 621
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
622 623
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
624

S
Shengliang Guan 已提交
625
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
626
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
627
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
628
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
S
Shengliang Guan 已提交
629
      mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
D
dapan1121 已提交
630
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
631 632 633
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
634
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
635
      }
L
Liu Jicong 已提交
636 637
    }
  }
S
Shengliang Guan 已提交
638
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
639

S
Shengliang Guan 已提交
640 641 642 643
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
644
  tFreeClientHbBatchRsp(&batchRsp);
S
Shengliang Guan 已提交
645 646
  pReq->info.rspLen = tlen;
  pReq->info.rsp = buf;
S
Shengliang Guan 已提交
647 648 649 650

  return 0;
}

S
Shengliang Guan 已提交
651 652
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
653 654
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

655
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
656
  if (pUser == NULL) return 0;
657
  if (!pUser->superUser) {
658 659 660 661 662 663
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
664
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
665
  if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
666 667 668 669
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

D
dapan1121 已提交
670
  mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
S
Shengliang Guan 已提交
671
  int32_t  connId = 0;
D
dapan1121 已提交
672
  uint64_t queryId = 0;
S
Shengliang Guan 已提交
673
  char    *p = strchr(killReq.queryStrId, ':');
D
dapan1121 已提交
674 675 676 677 678 679 680 681
  if (NULL == p) {
    mError("invalid query id %s", killReq.queryStrId);
    terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
    return -1;
  }
  *p = 0;
  connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
  queryId = taosStr2UInt64(p + 1, NULL, 16);
682

D
dapan1121 已提交
683
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
684
  if (pConn == NULL) {
D
dapan1121 已提交
685
    mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
686 687 688
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
D
dapan1121 已提交
689
    mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
D
dapan1121 已提交
690
    pConn->killId = queryId;
D
dapan1121 已提交
691
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
692 693 694 695
    return 0;
  }
}

S
Shengliang Guan 已提交
696 697
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
698 699
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

700
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
701
  if (pUser == NULL) return 0;
702
  if (!pUser->superUser) {
703 704 705 706 707 708
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
709
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
710
  if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
711 712 713
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
714

D
dapan1121 已提交
715
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
716
  if (pConn == NULL) {
D
dapan1121 已提交
717
    mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
718 719 720
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
D
dapan1121 已提交
721
    mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
722
    pConn->killed = 1;
D
dapan1121 已提交
723
    taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
724 725 726 727
    return TSDB_CODE_SUCCESS;
  }
}

D
dapan1121 已提交
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
  int32_t code = -1;
  SServerVerRsp rsp = {0};
  strcpy(rsp.ver, version);
  
  int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
  if (contLen < 0) goto _over;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto _over;
  tSerializeSServerVerRsp(pRsp, contLen, &rsp);

  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;

  code = 0;

_over:

  return code;
}


750
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
751 752 753 754 755 756
  SMnode   *pMnode = pReq->info.node;
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  int32_t   cols = 0;
  SConnObj *pConn = NULL;

H
Haojun Liao 已提交
757 758
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
759
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
H
Haojun Liao 已提交
760 761
  }

S
Shengliang Guan 已提交
762
  while (numOfRows < rows) {
H
Haojun Liao 已提交
763
    pConn = mndGetNextConn(pMnode, pShow->pIter);
764 765 766 767
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
768 769

    cols = 0;
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);

    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(user, pConn->user);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)user, false);

    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
    STR_TO_VARSTR(app, pConn->app);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)app, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);

    char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
    varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
S
Shengliang Guan 已提交
798 799 800 801

    numOfRows++;
  }

802
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
803 804 805
  return numOfRows;
}

D
dapan1121 已提交
806
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
807 808 809 810 811 812
  SMnode   *pMnode = pReq->info.node;
  SSdb     *pSdb = pMnode->pSdb;
  int32_t   numOfRows = 0;
  int32_t   cols = 0;
  SConnObj *pConn = NULL;

H
Haojun Liao 已提交
813 814
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
815
    pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
H
Haojun Liao 已提交
816 817
  }

S
Shengliang Guan 已提交
818
  while (numOfRows < rows) {
H
Haojun Liao 已提交
819
    pConn = mndGetNextConn(pMnode, pShow->pIter);
820 821 822 823
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
824

825
    taosRLockLatch(&pConn->queryLock);
D
dapan1121 已提交
826
    if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) {
827
      taosRUnLockLatch(&pConn->queryLock);
D
dapan1121 已提交
828
      continue;
S
Shengliang Guan 已提交
829 830
    }

D
dapan1121 已提交
831 832
    int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
    for (int32_t i = 0; i < numOfQueries; ++i) {
833
      SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
S
Shengliang Guan 已提交
834 835
      cols = 0;

836 837 838
      char queryId[26 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
      varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
839
      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
840 841 842
      colDataAppend(pColInfo, numOfRows, (const char *)queryId, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
843
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false);
S
Shengliang Guan 已提交
844

D
dapan1121 已提交
845
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
846
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
S
Shengliang Guan 已提交
847

848 849 850 851 852 853
      char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
      STR_TO_VARSTR(app, pConn->app);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)app, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
854
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);
855

D
dapan1121 已提交
856
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
857
      STR_TO_VARSTR(user, pConn->user);
D
dapan1121 已提交
858
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
859
      colDataAppend(pColInfo, numOfRows, (const char *)user, false);
S
Shengliang Guan 已提交
860

D
dapan1121 已提交
861 862 863
      char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
      varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
864
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
865
      colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
S
Shengliang Guan 已提交
866

D
dapan1121 已提交
867
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
868
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false);
S
Shengliang Guan 已提交
869

D
dapan1121 已提交
870
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
871
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false);
S
Shengliang Guan 已提交
872

873 874
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
S
Shengliang Guan 已提交
875

D
dapan1121 已提交
876
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
877 878
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);

879
      char    subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
880 881 882 883 884 885
      int32_t strSize = sizeof(subStatus);
      int32_t offset = VARSTR_HEADER_SIZE;
      for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
        if (i) {
          offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
        }
886
        SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
887 888 889 890 891 892 893 894 895 896
        offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
      }
      varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, subStatus, false);

      char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
      STR_TO_VARSTR(sql, pQuery->sql);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
S
Shengliang Guan 已提交
897 898 899

      numOfRows++;
    }
900

901
    taosRUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
902 903
  }

904
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
905 906 907
  return numOfRows;
}

D
dapan1121 已提交
908
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
S
Shengliang Guan 已提交
909 910 911 912 913 914
  SMnode  *pMnode = pReq->info.node;
  SSdb    *pSdb = pMnode->pSdb;
  int32_t  numOfRows = 0;
  int32_t  cols = 0;
  SAppObj *pApp = NULL;

D
dapan1121 已提交
915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
  }

  while (numOfRows < rows) {
    pApp = mndGetNextApp(pMnode, pShow->pIter);
    if (pApp == NULL) {
      pShow->pIter = NULL;
      break;
    }

    cols = 0;

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->appId, false);

D
dapan1121 已提交
932 933 934
    char ip[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&ip[VARSTR_HEADER_SIZE], "%s", taosIpStr(pApp->ip));
    varDataLen(ip) = strlen(&ip[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
935
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
936
    colDataAppend(pColInfo, numOfRows, (const char *)ip, false);
D
dapan1121 已提交
937 938 939 940

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->pid, false);

D
dapan1121 已提交
941 942 943
    char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&name[VARSTR_HEADER_SIZE], "%s", pApp->name);
    varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
944
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
945
    colDataAppend(pColInfo, numOfRows, (const char *)name, false);
D
dapan1121 已提交
946 947

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
948
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->startTime, false);
D
dapan1121 已提交
949 950

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
951
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertsReq, false);
D
dapan1121 已提交
952 953

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
954
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertRows, false);
D
dapan1121 已提交
955 956

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
957
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.insertElapsedTime, false);
D
dapan1121 已提交
958 959

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
960
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.insertBytes, false);
D
dapan1121 已提交
961 962

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
963
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.fetchBytes, false);
D
dapan1121 已提交
964 965

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
966
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.queryElapsedTime, false);
D
dapan1121 已提交
967 968

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
969
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.numOfSlowQueries, false);
D
dapan1121 已提交
970 971

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
972
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.totalRequests, false);
D
dapan1121 已提交
973 974

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
975
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->summary.currentRequests, false);
D
dapan1121 已提交
976 977

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
978
    colDataAppend(pColInfo, numOfRows, (const char *)&pApp->lastAccessTimeMs, false);
D
dapan1121 已提交
979 980 981 982 983 984 985 986

    numOfRows++;
  }

  pShow->numOfRows += numOfRows;
  return numOfRows;
}

S
Shengliang Guan 已提交
987
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
988 989 990
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
991
}
S
Shengliang Guan 已提交
992

S
Shengliang Guan 已提交
993 994
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
D
dapan1121 已提交
995
  return taosCacheGetNumOfObj(pMgmt->connCache);
D
dapan1121 已提交
996
}