mndProfile.c 29.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20
#include "mndProfile.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
21
#include "mndStb.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
24
#include "version.h"
S
Shengliang Guan 已提交
25

L
Liu Jicong 已提交
26 27
#define QUERY_ID_SIZE      20
#define QUERY_OBJ_ID_SIZE  18
S
Shengliang Guan 已提交
28
#define SUBQUERY_INFO_SIZE 6
L
Liu Jicong 已提交
29
#define QUERY_SAVE_SIZE    20
S
Shengliang Guan 已提交
30 31

typedef struct {
D
dapan1121 已提交
32
  uint32_t    id;
L
Liu Jicong 已提交
33
  int8_t      connType;
S
Shengliang Guan 已提交
34 35 36 37 38 39 40 41 42
  char        user[TSDB_USER_LEN];
  char        app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t     appStartTimeMs;          // app start time
  int32_t     pid;                     // pid of app that invokes taosc
  uint32_t    ip;
  uint16_t    port;
  int8_t      killed;
  int64_t     loginTimeMs;
  int64_t     lastAccessTimeMs;
D
dapan1121 已提交
43
  uint64_t    killId;
S
Shengliang Guan 已提交
44
  int32_t     numOfQueries;
D
dapan1121 已提交
45
  SArray     *pQueries;      //SArray<SQueryDesc>
S
Shengliang Guan 已提交
46 47
} SConnObj;

L
Liu Jicong 已提交
48 49
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
50
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
51
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
52
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
H
Haojun Liao 已提交
53
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
54
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
55 56 57 58 59 60 61 62
static int32_t   mndProcessHeartBeatReq(SNodeMsg *pReq);
static int32_t   mndProcessConnectReq(SNodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SNodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SNodeMsg *pReq);
static int32_t   mndGetConnsMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t   mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t   mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t   mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
63 64 65 66 67
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
68
  int32_t connCheckTime = tsShellActivityTimer * 2;
S
Shengliang Guan 已提交
69 70 71 72 73 74 75
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
80

81
//  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
82
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
83
//  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
85

S
Shengliang Guan 已提交
86 87 88 89 90 91 92 93 94 95 96
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

L
Liu Jicong 已提交
97 98
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
99 100
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
101 102 103
  char connStr[255] = {0};
  int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
  int32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
104
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
105

S
Shengliang Guan 已提交
106
  SConnObj connObj = {.id = connId,
L
Liu Jicong 已提交
107
                      .connType = connType,
S
Shengliang Guan 已提交
108 109
                      .appStartTimeMs = startTime,
                      .pid = pid,
S
shm  
Shengliang Guan 已提交
110 111
                      .ip = ip,
                      .port = port,
S
Shengliang Guan 已提交
112
                      .killed = 0,
S
Shengliang Guan 已提交
113 114
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
D
dapan1121 已提交
115
                      .killId = 0,
S
Shengliang Guan 已提交
116 117 118
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
119
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
120
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
121 122
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
123
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
124
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
125 126
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
127
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
128 129
    return NULL;
  } else {
D
dapan1121 已提交
130
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
131 132
    return pConn;
  }
S
Shengliang Guan 已提交
133 134 135
}

static void mndFreeConn(SConnObj *pConn) {
wafwerar's avatar
wafwerar 已提交
136
  taosMemoryFreeClear(pConn->pQueries);
D
dapan1121 已提交
137
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
138 139
}

D
dapan1121 已提交
140
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
141 142
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
143
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
144
  if (pConn == NULL) {
D
dapan1121 已提交
145
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
146 147 148
    return NULL;
  }

S
Shengliang Guan 已提交
149
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
150
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
151

D
dapan1121 已提交
152
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
153 154 155 156 157
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
158
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
159

S
Shengliang Guan 已提交
160
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
161 162 163
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

H
Haojun Liao 已提交
164
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
165 166
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
167 168 169 170 171
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
172 173
  }

H
Haojun Liao 已提交
174
  return pConn;
S
Shengliang Guan 已提交
175 176 177
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
178 179 180
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
181 182
}

S
Shengliang Guan 已提交
183 184
static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
  SMnode     *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
185 186 187 188 189
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};
S
shm  
Shengliang Guan 已提交
190
  char        ip[30] = {0};
S
Shengliang Guan 已提交
191 192 193 194 195

  if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
196

S
shm  
Shengliang Guan 已提交
197
  taosIp2String(pReq->clientIp, ip);
S
Shengliang Guan 已提交
198

199 200 201 202 203 204
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }

S
Shengliang Guan 已提交
205
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
206 207 208
    char db[TSDB_DB_FNAME_LEN];
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
209 210
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
211
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
212
      goto CONN_OVER;
S
Shengliang Guan 已提交
213 214 215
    }
  }

L
Liu Jicong 已提交
216 217
  pConn = mndCreateConn(pMnode, pReq->user, connReq.connType, pReq->clientIp, pReq->clientPort, connReq.pid,
                        connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
218
  if (pConn == NULL) {
S
Shengliang Guan 已提交
219
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
220
    goto CONN_OVER;
S
Shengliang Guan 已提交
221 222
  }

D
dapan1121 已提交
223 224
  mndAcquireConn(pMnode, pConn->id);

S
Shengliang Guan 已提交
225 226 227 228 229
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
230
  connectRsp.connType = connReq.connType;
S
Shengliang Guan 已提交
231

S
Shengliang Guan 已提交
232 233 234
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
235

S
Shengliang Guan 已提交
236 237 238 239 240
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
241

S
Shengliang Guan 已提交
242 243
  pReq->rspLen = contLen;
  pReq->pRsp = pRsp;
244

D
dapan1121 已提交
245
  mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->user, ip, pConn->port, pConn->id, connReq.app);
246 247 248 249 250 251 252 253 254 255

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
256 257
}

D
dapan1121 已提交
258 259
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
260

D
dapan1121 已提交
261 262 263 264
  pConn->pQueries = pBasic->queryDesc;
  pBasic->queryDesc = NULL;
  
  pConn->numOfQueries =  pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
S
Shengliang Guan 已提交
265 266 267 268

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
269
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
270
#if 0
wafwerar's avatar
wafwerar 已提交
271
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
287
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
320
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
321 322 323 324 325 326 327 328 329
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
330 331
#endif
  return NULL;
L
Liu Jicong 已提交
332 333
}

D
dapan1121 已提交
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

    SRpcConnInfo connInfo = {0};
    rpcGetConnInfo(pMsg->handle, &connInfo);

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
    if (pConn == NULL) {    
      pConn = mndCreateConn(pMnode, connInfo.user, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0);
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
        mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
      }
    } else if (pConn->killed) {
      mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
      mndReleaseConn(pMnode, pConn);      
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }

    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
    rspBasic->totalDnodes = 1;  //TODO
    rspBasic->onlineDnodes = 1; //TODO
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
    taosArrayPush(pBatchRsp->rsps, &hbRsp);  
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
      case HEARTBEAT_KEY_DBINFO: {
        void   *rspMsg = NULL;
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
        void   *rspMsg = NULL;
        int32_t rspLen = 0;
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
439 440
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
441

L
Liu Jicong 已提交
442
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
443 444 445 446 447
  if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
448 449
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
450

S
Shengliang Guan 已提交
451
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
452
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
453
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
L
Liu Jicong 已提交
454
    if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
D
dapan1121 已提交
455
      mndProcessQueryHeartBeat(pMnode, &pReq->rpcMsg, pHbReq, &batchRsp);
L
Liu Jicong 已提交
456
    } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
L
Liu Jicong 已提交
457 458 459
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
460
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
461
      }
L
Liu Jicong 已提交
462 463
    }
  }
S
Shengliang Guan 已提交
464
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
465

S
Shengliang Guan 已提交
466 467 468 469
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
470 471 472
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
S
Shengliang Guan 已提交
473
    int32_t       kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
D
dapan1121 已提交
474 475
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
wafwerar's avatar
wafwerar 已提交
476
      taosMemoryFreeClear(kv->value);
D
dapan1121 已提交
477 478 479 480
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
481
  taosArrayDestroy(batchRsp.rsps);
S
Shengliang Guan 已提交
482 483
  pReq->rspLen = tlen;
  pReq->pRsp = buf;
S
Shengliang Guan 已提交
484 485 486 487

  return 0;
}

S
Shengliang Guan 已提交
488 489
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
  SMnode       *pMnode = pReq->pNode;
490 491
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
492
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
493
  if (pUser == NULL) return 0;
494
  if (!pUser->superUser) {
495 496 497 498 499 500
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
501
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
502
  if (tDeserializeSKillQueryReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
503 504 505 506 507
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
508

S
Shengliang Guan 已提交
509
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
510
  if (pConn == NULL) {
S
Shengliang Guan 已提交
511
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
512 513 514
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
515
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user);
D
dapan1121 已提交
516
    pConn->killId = killReq.queryId;
517 518 519 520 521
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
522 523
static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
  SMnode       *pMnode = pReq->pNode;
524 525
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
526
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
527
  if (pUser == NULL) return 0;
528
  if (!pUser->superUser) {
529 530 531 532 533 534
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
535
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
536
  if (tDeserializeSKillConnReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
537 538 539
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
540

S
Shengliang Guan 已提交
541
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
542
  if (pConn == NULL) {
S
Shengliang Guan 已提交
543
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
544 545 546
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
547
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->user);
548 549 550 551 552 553
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
554 555
static int32_t mndGetConnsMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode       *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
556 557
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
558
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
559
  if (pUser == NULL) return 0;
560
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
561
    mndReleaseUser(pMnode, pUser);
562 563
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
564 565 566 567
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
568
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
569 570 571 572

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
573
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
574 575 576 577 578
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
579
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
580 581 582 583 584 585
  cols++;

  // app name
  pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "program");
S
Shengliang Guan 已提交
586
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
587 588 589 590 591 592
  cols++;

  // app pid
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
593
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
594 595 596 597 598
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
599
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
600 601 602 603 604
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "login_time");
S
Shengliang Guan 已提交
605
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
606 607 608 609 610
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "last_access");
S
Shengliang Guan 已提交
611
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
612 613
  cols++;

S
Shengliang Guan 已提交
614
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
615 616 617 618 619 620 621
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

H
Haojun Liao 已提交
622
  pShow->numOfRows = taosCacheGetNumOfObj(pMgmt->cache);
S
Shengliang Guan 已提交
623
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
624
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
625 626 627 628

  return 0;
}

S
Shengliang Guan 已提交
629 630
static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
631
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
632
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
633 634 635 636
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

H
Haojun Liao 已提交
637 638 639 640 641
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
642
  while (numOfRows < rows) {
H
Haojun Liao 已提交
643
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
644
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
645 646 647 648

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
D
dapan1121 已提交
649
    *(uint32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
650 651 652
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
653
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
654 655 656 657
    cols++;

    // app name
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
658
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
S
Shengliang Guan 已提交
659 660 661 662
    cols++;

    // app pid
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
663
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
664 665 666
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
667
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
S
Shengliang Guan 已提交
668 669 670 671
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
672
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
673 674 675
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
676 677
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
678 679 680 681 682
    cols++;

    numOfRows++;
  }

683
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
684 685 686 687

  return numOfRows;
}

S
Shengliang Guan 已提交
688 689
static int32_t mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode       *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
690 691
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
692
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
693
  if (pUser == NULL) return 0;
694
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
695
    mndReleaseUser(pMnode, pUser);
696 697
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
698 699 700 701
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
702
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
703

S
Shengliang Guan 已提交
704 705 706
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "queryId");
S
Shengliang Guan 已提交
707
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
708 709 710 711 712
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
713
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
714 715 716 717 718
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
719
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
720 721 722 723 724
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
725
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
726 727
  cols++;

S
Shengliang Guan 已提交
728
  pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
S
Shengliang Guan 已提交
729 730
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "qid");
S
Shengliang Guan 已提交
731
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
732 733 734 735 736
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "created_time");
S
Shengliang Guan 已提交
737
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
738 739 740 741 742
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
  strcpy(pSchema[cols].name, "time");
S
Shengliang Guan 已提交
743
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
744 745 746 747 748
  cols++;

  pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql_obj_id");
S
Shengliang Guan 已提交
749
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
750 751 752 753 754
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
755
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
756 757 758 759 760
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ep");
S
Shengliang Guan 已提交
761
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
762 763 764 765 766
  cols++;

  pShow->bytes[cols] = 1;
  pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
  strcpy(pSchema[cols].name, "stable_query");
S
Shengliang Guan 已提交
767
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
768 769 770 771 772
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "sub_queries");
S
Shengliang Guan 已提交
773
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
774 775 776 777 778
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sub_query_info");
S
Shengliang Guan 已提交
779
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
780 781 782 783 784
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
S
Shengliang Guan 已提交
785
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
786 787
  cols++;

S
Shengliang Guan 已提交
788
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
789 790 791 792 793 794 795 796 797
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = 1000000;
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
798
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
799 800 801 802

  return 0;
}

S
Shengliang Guan 已提交
803 804
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
805
  int32_t   numOfRows = 0;
D
dapan1121 已提交
806
#if 0
S
Shengliang Guan 已提交
807
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
808
  int32_t   cols = 0;
809 810
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
811 812
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

H
Haojun Liao 已提交
813 814 815 816 817
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
818
  while (numOfRows < rows) {
H
Haojun Liao 已提交
819
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
820
    if (pConn == NULL) {
H
Haojun Liao 已提交
821
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
822 823 824
      break;
    }

S
Shengliang Guan 已提交
825
    if (numOfRows + pConn->numOfQueries >= rows) {
H
Haojun Liao 已提交
826 827
      taosCacheDestroyIter(pShow->pIter);
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
828 829 830
      break;
    }

S
Shengliang Guan 已提交
831 832
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
833 834 835
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
836 837 838 839 840
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
841 842 843
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
844
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
845 846 847
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
848
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
S
Shengliang Guan 已提交
849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
877
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
S
Shengliang Guan 已提交
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
      cols++;

      numOfRows++;
    }
  }

902
  pShow->numOfRows += numOfRows;
D
dapan1121 已提交
903
#endif  
S
Shengliang Guan 已提交
904 905 906 907
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
908 909 910
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
911
}
S
Shengliang Guan 已提交
912

S
Shengliang Guan 已提交
913 914 915
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  return taosCacheGetNumOfObj(pMgmt->cache);
D
dapan1121 已提交
916
}