You need to sign in or sign up before continuing.
mndProfile.c 29.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
17
#include "tglobal.h"
S
Shengliang Guan 已提交
18
#include "mndProfile.h"
19
//#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
23
//#include "mndTopic.h"
S
Shengliang Guan 已提交
24
#include "mndUser.h"
25
//#include "mndVgroup.h"
S
Shengliang Guan 已提交
26

S
Shengliang Guan 已提交
27 28 29
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6
S
Shengliang Guan 已提交
30
#define QUERY_SAVE_SIZE 20
S
Shengliang Guan 已提交
31 32

typedef struct {
S
Shengliang Guan 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45
  int32_t     id;
  char        user[TSDB_USER_LEN];
  char        app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t     appStartTimeMs;          // app start time
  int32_t     pid;                     // pid of app that invokes taosc
  uint32_t    ip;
  uint16_t    port;
  int8_t      killed;
  int64_t     loginTimeMs;
  int64_t     lastAccessTimeMs;
  int32_t     queryId;
  int32_t     numOfQueries;
  SQueryDesc *pQueries;
S
Shengliang Guan 已提交
46 47
} SConnObj;

S
Shengliang Guan 已提交
48
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
49
static void      mndFreeConn(SConnObj *pConn);
S
Shengliang Guan 已提交
50
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
S
Shengliang Guan 已提交
51 52 53
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void     *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
54 55 56 57
static int32_t   mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t   mndProcessConnectReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SMnodeMsg *pReq);
S
Shengliang Guan 已提交
58
static int32_t   mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
59
static int32_t   mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
60
static int32_t   mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
61
static int32_t   mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
62 63 64 65 66
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
67
  int32_t connCheckTime = pMnode->cfg.shellActivityTimer * 2;
S
Shengliang Guan 已提交
68 69 70 71 72 73 74
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
75 76 77 78
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
79 80 81 82

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
S
Shengliang Guan 已提交
83 84 85
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
86

S
Shengliang Guan 已提交
87 88 89 90 91 92 93 94 95 96 97
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
Shengliang Guan 已提交
98
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
99 100 101 102
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
  if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
S
Shengliang Guan 已提交
103
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
104

S
Shengliang Guan 已提交
105 106 107 108 109
  SConnObj connObj = {.id = connId,
                      .appStartTimeMs = startTime,
                      .pid = pid,
                      .ip = pInfo->clientIp,
                      .port = pInfo->clientPort,
S
Shengliang Guan 已提交
110
                      .killed = 0,
S
Shengliang Guan 已提交
111 112
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
S
Shengliang Guan 已提交
113 114 115 116
                      .queryId = 0,
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
117 118
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
  tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
119 120
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
121
  int32_t   keepTime = pMnode->cfg.shellActivityTimer * 3;
S
Shengliang Guan 已提交
122
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
123 124
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
125
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr());
S
Shengliang Guan 已提交
126 127
    return NULL;
  } else {
S
Shengliang Guan 已提交
128
    mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user);
S
Shengliang Guan 已提交
129 130
    return pConn;
  }
S
Shengliang Guan 已提交
131 132 133 134
}

static void mndFreeConn(SConnObj *pConn) {
  tfree(pConn->pQueries);
S
Shengliang Guan 已提交
135
  mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
136 137
}

S
Shengliang Guan 已提交
138
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
S
Shengliang Guan 已提交
139 140 141 142
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
143
    mDebug("conn:%d, already destroyed", connId);
S
Shengliang Guan 已提交
144 145 146
    return NULL;
  }

S
Shengliang Guan 已提交
147
  int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
S
Shengliang Guan 已提交
148
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
149

S
Shengliang Guan 已提交
150
  mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
151 152 153 154 155
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
S
Shengliang Guan 已提交
156
  mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
157

S
Shengliang Guan 已提交
158
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  *pConn = NULL;

  pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter);
  if (pIter == NULL) return NULL;

  SCacheDataNode **pNode = pIter;
  if (pNode == NULL || *pNode == NULL) {
    taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
    return NULL;
  }

  *pConn = (SConnObj *)((*pNode)->data);
  return pIter;
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}

S
Shengliang Guan 已提交
185
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
186 187 188 189 190 191
  SMnode   *pMnode = pReq->pMnode;
  SUserObj *pUser = NULL;
  SDbObj   *pDb = NULL;
  SConnObj *pConn = NULL;
  int32_t   code = -1;

S
Shengliang Guan 已提交
192 193 194
  SConnectReq *pConnReq = pReq->rpcMsg.pCont;
  pConnReq->pid = htonl(pConnReq->pid);
  pConnReq->startTime = htobe64(pConnReq->startTime);
S
Shengliang Guan 已提交
195 196

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
197 198
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
199
    goto CONN_OVER;
S
Shengliang Guan 已提交
200 201 202 203 204
  }

  char ip[30];
  taosIp2String(info.clientIp, ip);

205 206 207 208 209 210
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }

S
Shengliang Guan 已提交
211
  if (pConnReq->db[0]) {
212 213
    snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db);
    pDb = mndAcquireDb(pMnode, pReq->db);
S
Shengliang Guan 已提交
214 215
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
216
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
217
      goto CONN_OVER;
S
Shengliang Guan 已提交
218 219 220
    }
  }

221
  pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
S
Shengliang Guan 已提交
222
  if (pConn == NULL) {
S
Shengliang Guan 已提交
223
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
224
    goto CONN_OVER;
S
Shengliang Guan 已提交
225 226 227 228 229
  }

  SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
230
    mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
231
    goto CONN_OVER;
S
Shengliang Guan 已提交
232 233
  }

234
  pRsp->acctId    = htonl(pUser->acctId);
235
  pRsp->superUser = pUser->superUser;
236
  pRsp->clusterId = htobe64(pMnode->clusterId);
237 238 239
  pRsp->connId    = htonl(pConn->id);

  snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo);
S
Shengliang Guan 已提交
240 241
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);

S
Shengliang Guan 已提交
242 243
  pReq->contLen = sizeof(SConnectRsp);
  pReq->pCont = pRsp;
244

S
Shengliang Guan 已提交
245
  mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
246 247 248 249 250 251 252 253 254 255

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
256 257
}

S
Shengliang Guan 已提交
258
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
S
Shengliang Guan 已提交
259
  pConn->numOfQueries = 0;
S
Shengliang Guan 已提交
260
  int32_t numOfQueries = htonl(pReq->numOfQueries);
S
Shengliang Guan 已提交
261 262 263

  if (numOfQueries > 0) {
    if (pConn->pQueries == NULL) {
S
Shengliang Guan 已提交
264
      pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
S
Shengliang Guan 已提交
265 266
    }

dengyihao's avatar
dengyihao 已提交
267
    pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries);
S
Shengliang Guan 已提交
268 269 270

    int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
    if (saveSize > 0 && pConn->pQueries != NULL) {
S
Shengliang Guan 已提交
271
      memcpy(pConn->pQueries, pReq->pData, saveSize);
S
Shengliang Guan 已提交
272 273 274 275 276 277
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
278
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
L
Liu Jicong 已提交
279
#if 0
L
Liu Jicong 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
  SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
    free(pRsp);
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
  void* buf = malloc(tlen);
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
339 340
#endif
  return NULL;
L
Liu Jicong 已提交
341 342
}

S
Shengliang Guan 已提交
343
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
L
Liu Jicong 已提交
344 345 346
  SMnode *pMnode = pReq->pMnode;
  char *batchReqStr = pReq->rpcMsg.pCont;
  SClientHbBatchReq batchReq = {0};
L
Liu Jicong 已提交
347
  tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
L
Liu Jicong 已提交
348 349 350
  SArray *pArray = batchReq.reqs;
  int sz = taosArrayGetSize(pArray);

L
Liu Jicong 已提交
351 352
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
353

L
Liu Jicong 已提交
354 355 356
  for (int i = 0; i < sz; i++) {
    SClientHbReq* pHbReq = taosArrayGet(pArray, i);
    if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
D
dapan1121 已提交
357 358 359 360
      int32_t kvNum = taosHashGetSize(pHbReq->info);
      if (NULL == pHbReq->info || kvNum <= 0) {
        continue;
      }
L
Liu Jicong 已提交
361

D
dapan1121 已提交
362 363 364 365 366 367 368
      SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};

      void *pIter = taosHashIterate(pHbReq->info, NULL);
      while (pIter != NULL) {
        SKv* kv = pIter;
      
        switch (kv->key) {
D
dapan1121 已提交
369
          case HEARTBEAT_KEY_DBINFO: {
D
dapan1121 已提交
370 371 372 373 374
            void *rspMsg = NULL;
            int32_t rspLen = 0;
            mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen);
            if (rspMsg && rspLen > 0) {
              SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
D
dapan1121 已提交
375
              taosArrayPush(hbRsp.info, &kv);
D
dapan1121 已提交
376 377
            }
            break;
D
dapan1121 已提交
378
          }
D
dapan1121 已提交
379 380 381 382 383
          case HEARTBEAT_KEY_STBINFO:

            break;
          default:
            mError("invalid kv key:%d", kv->key);
D
dapan1121 已提交
384
            hbRsp.status = TSDB_CODE_MND_APP_ERROR;
D
dapan1121 已提交
385 386 387 388 389
            break;
        }
              
        pIter = taosHashIterate(pHbReq->info, pIter);
      }
D
dapan1121 已提交
390 391

      taosArrayPush(batchRsp.rsps, &hbRsp);
L
Liu Jicong 已提交
392
    } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
L
Liu Jicong 已提交
393 394 395 396 397
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
        free(pRsp);
      }
L
Liu Jicong 已提交
398 399
    }
  }
L
Liu Jicong 已提交
400 401
  taosArrayDestroyEx(pArray, tFreeClientHbReq);

L
Liu Jicong 已提交
402 403
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
  void* buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
404 405
  void* abuf = buf;
  tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
D
dapan1121 已提交
406 407 408 409 410 411 412 413 414 415 416 417
  
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
    int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0;
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
      tfree(kv->value);
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
418
  taosArrayDestroy(batchRsp.rsps);
L
Liu Jicong 已提交
419 420
  pReq->contLen = tlen;
  pReq->pCont = buf;
L
Liu Jicong 已提交
421 422
  return 0;

L
Liu Jicong 已提交
423
#if 0
S
Shengliang Guan 已提交
424
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
425 426
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
427 428 429
  SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
  pHeartbeat->connId = htonl(pHeartbeat->connId);
  pHeartbeat->pid = htonl(pHeartbeat->pid);
S
Shengliang Guan 已提交
430 431

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
432 433
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
434 435 436
    return -1;
  }

S
Shengliang Guan 已提交
437
  SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
S
Shengliang Guan 已提交
438
  if (pConn == NULL) {
S
Shengliang Guan 已提交
439
    pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
S
Shengliang Guan 已提交
440
    if (pConn == NULL) {
S
Shengliang Guan 已提交
441
      mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
442 443
      return -1;
    } else {
S
Shengliang Guan 已提交
444
      mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
S
Shengliang Guan 已提交
445
    }
S
Shengliang Guan 已提交
446
  } else if (pConn->killed) {
S
Shengliang Guan 已提交
447
    mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
S
Shengliang Guan 已提交
448
    terrno = TSDB_CODE_MND_INVALID_CONNECTION;
S
Shengliang Guan 已提交
449 450
    return -1;
  } else {
S
Shengliang Guan 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463
    if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
      char oldIpStr[40];
      char newIpStr[40];
      taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
      taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
      mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
             pConn->user, oldIpStr);

      if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
      taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }
S
Shengliang Guan 已提交
464 465 466 467 468 469
  }

  SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
  if (pRsp == NULL) {
    mndReleaseConn(pMnode, pConn);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
470
    mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
471 472 473
    return -1;
  }

S
Shengliang Guan 已提交
474
  mndSaveQueryStreamList(pConn, pHeartbeat);
S
Shengliang Guan 已提交
475 476 477 478 479 480 481 482 483
  if (pConn->killed != 0) {
    pRsp->killConnection = 1;
  }

  if (pConn->queryId != 0) {
    pRsp->queryId = htonl(pConn->queryId);
    pConn->queryId = 0;
  }

S
Shengliang Guan 已提交
484
  pRsp->connId = htonl(pConn->id);
S
Shengliang Guan 已提交
485
  pRsp->totalDnodes = htonl(1);
S
Shengliang Guan 已提交
486
  pRsp->onlineDnodes = htonl(1);
S
Shengliang Guan 已提交
487 488 489
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);
  mndReleaseConn(pMnode, pConn);

S
Shengliang Guan 已提交
490 491
  pReq->contLen = sizeof(SConnectRsp);
  pReq->pCont = pRsp;
S
Shengliang Guan 已提交
492
  return 0;
L
Liu Jicong 已提交
493
#endif
S
Shengliang Guan 已提交
494 495
}

S
Shengliang Guan 已提交
496 497
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
498 499
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
500
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
501
  if (pUser == NULL) return 0;
502
  if (!pUser->superUser) {
503 504 505 506 507 508
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
509
  SKillQueryReq *pKill = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
510 511
  int32_t        connId = htonl(pKill->connId);
  int32_t        queryId = htonl(pKill->queryId);
S
Shengliang Guan 已提交
512
  mInfo("kill query msg is received, queryId:%d", pKill->queryId);
513 514 515

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
516
    mError("connId:%d, failed to kill queryId:%d, conn not exist", connId, queryId);
517 518 519
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
520
    mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pReq->user);
521 522 523 524 525 526
    pConn->queryId = queryId;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
527 528
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
529 530
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
531
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
532
  if (pUser == NULL) return 0;
533
  if (!pUser->superUser) {
534 535 536 537 538 539
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
540
  SKillConnReq *pKill = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
541 542 543
  int32_t       connId = htonl(pKill->connId);

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
544
  if (pConn == NULL) {
S
Shengliang Guan 已提交
545
    mError("connId:%d, failed to kill connection, conn not exist", connId);
546 547 548
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
549
    mInfo("connId:%d, is killed by user:%s", connId, pReq->user);
550 551 552 553 554 555
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
556
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
557
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
558 559
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
560
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
561
  if (pUser == NULL) return 0;
562
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
563
    mndReleaseUser(pMnode, pUser);
564 565
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
566 567 568 569
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
570
  SSchema *pSchema = pMeta->pSchema;
S
Shengliang Guan 已提交
571 572 573 574

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
H
Haojun Liao 已提交
575
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
576 577 578 579 580
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
H
Haojun Liao 已提交
581
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
582 583 584 585 586 587
  cols++;

  // app name
  pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "program");
H
Haojun Liao 已提交
588
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
589 590 591 592 593 594
  cols++;

  // app pid
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
H
Haojun Liao 已提交
595
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
596 597 598 599 600
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
H
Haojun Liao 已提交
601
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
602 603 604 605 606
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "login_time");
H
Haojun Liao 已提交
607
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
608 609 610 611 612
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "last_access");
H
Haojun Liao 已提交
613
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
614 615
  cols++;

H
Haojun Liao 已提交
616
  pMeta->numOfColumns = htonl(cols);
S
Shengliang Guan 已提交
617 618 619 620 621 622 623 624 625
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
626
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
627 628 629 630

  return 0;
}

S
Shengliang Guan 已提交
631 632
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
633
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
634
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
635 636 637 638 639
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
640 641
    pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
642 643 644 645

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
646
    *(int32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
647 648 649
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
650
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
651 652 653 654
    cols++;

    // app name
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
655
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
S
Shengliang Guan 已提交
656 657 658 659
    cols++;

    // app pid
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
660
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
661 662 663
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
664
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
S
Shengliang Guan 已提交
665 666 667 668
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
669
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
670 671 672
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
673 674
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
675 676 677 678 679 680 681 682 683 684
    cols++;

    numOfRows++;
  }

  pShow->numOfReads += numOfRows;

  return numOfRows;
}

S
Shengliang Guan 已提交
685
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
686
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
687 688
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
689
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
690
  if (pUser == NULL) return 0;
691
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
692
    mndReleaseUser(pMnode, pUser);
693 694
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
695 696 697 698
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
699
  SSchema *pSchema = pMeta->pSchema;
S
Shengliang Guan 已提交
700

S
Shengliang Guan 已提交
701 702 703
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "queryId");
H
Haojun Liao 已提交
704
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
705 706 707 708 709
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
H
Haojun Liao 已提交
710
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
711 712 713 714 715
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
H
Haojun Liao 已提交
716
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
717 718 719 720 721
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
H
Haojun Liao 已提交
722
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
723 724
  cols++;

S
Shengliang Guan 已提交
725
  pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
S
Shengliang Guan 已提交
726 727
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "qid");
H
Haojun Liao 已提交
728
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
729 730 731 732 733
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "created_time");
H
Haojun Liao 已提交
734
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
735 736 737 738 739
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
  strcpy(pSchema[cols].name, "time");
H
Haojun Liao 已提交
740
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
741 742 743 744 745
  cols++;

  pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql_obj_id");
H
Haojun Liao 已提交
746
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
747 748 749 750 751
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
H
Haojun Liao 已提交
752
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
753 754 755 756 757
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ep");
H
Haojun Liao 已提交
758
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
759 760 761 762 763
  cols++;

  pShow->bytes[cols] = 1;
  pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
  strcpy(pSchema[cols].name, "stable_query");
H
Haojun Liao 已提交
764
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
765 766 767 768 769
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "sub_queries");
H
Haojun Liao 已提交
770
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
771 772 773 774 775
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sub_query_info");
H
Haojun Liao 已提交
776
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
777 778 779 780 781
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
H
Haojun Liao 已提交
782
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
783 784
  cols++;

H
Haojun Liao 已提交
785
  pMeta->numOfColumns = htonl(cols);
S
Shengliang Guan 已提交
786 787 788 789 790 791 792 793 794
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = 1000000;
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
795
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
796 797 798 799

  return 0;
}

S
Shengliang Guan 已提交
800 801
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
802
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
803
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
804
  int32_t   cols = 0;
805 806
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
807 808 809
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
810 811
    pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) {
S
Shengliang Guan 已提交
812 813 814 815
      pShow->pIter = pIter;
      break;
    }

S
Shengliang Guan 已提交
816
    if (numOfRows + pConn->numOfQueries >= rows) {
S
Shengliang Guan 已提交
817 818 819 820 821
      mndCancelGetNextConn(pMnode, pIter);
      break;
    }

    pShow->pIter = pIter;
S
Shengliang Guan 已提交
822 823
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
824 825 826
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
827 828 829 830 831
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
832 833 834
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
835
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
836 837 838
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
839
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
S
Shengliang Guan 已提交
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
868
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
S
Shengliang Guan 已提交
869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
      cols++;

      numOfRows++;
    }
  }

S
Shengliang Guan 已提交
893
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
894 895 896 897 898 899 900 901
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}