mndProfile.c 29.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
17
#include "tglobal.h"
S
Shengliang Guan 已提交
18
#include "mndProfile.h"
19
//#include "mndConsumer.h"
S
Shengliang Guan 已提交
20
#include "mndDb.h"
H
Haojun Liao 已提交
21
#include "mndStb.h"
S
Shengliang Guan 已提交
22 23
#include "mndMnode.h"
#include "mndShow.h"
24
//#include "mndTopic.h"
S
Shengliang Guan 已提交
25
#include "mndUser.h"
26
//#include "mndVgroup.h"
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28 29 30
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6
S
Shengliang Guan 已提交
31
#define QUERY_SAVE_SIZE 20
S
Shengliang Guan 已提交
32 33

typedef struct {
S
Shengliang Guan 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46
  int32_t     id;
  char        user[TSDB_USER_LEN];
  char        app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t     appStartTimeMs;          // app start time
  int32_t     pid;                     // pid of app that invokes taosc
  uint32_t    ip;
  uint16_t    port;
  int8_t      killed;
  int64_t     loginTimeMs;
  int64_t     lastAccessTimeMs;
  int32_t     queryId;
  int32_t     numOfQueries;
  SQueryDesc *pQueries;
S
Shengliang Guan 已提交
47 48
} SConnObj;

S
Shengliang Guan 已提交
49
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
50
static void      mndFreeConn(SConnObj *pConn);
S
Shengliang Guan 已提交
51
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
S
Shengliang Guan 已提交
52 53 54
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void     *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
55 56 57 58
static int32_t   mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t   mndProcessConnectReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SMnodeMsg *pReq);
S
Shengliang Guan 已提交
59
static int32_t   mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
60
static int32_t   mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
61
static int32_t   mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
62
static int32_t   mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
63 64 65 66 67
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
68
  int32_t connCheckTime = pMnode->cfg.shellActivityTimer * 2;
S
Shengliang Guan 已提交
69 70 71 72 73 74 75
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
80 81 82 83

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
S
Shengliang Guan 已提交
84 85 86
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
87

S
Shengliang Guan 已提交
88 89 90 91 92 93 94 95 96 97 98
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
Shengliang Guan 已提交
99
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
100 101 102 103
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
  if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
S
Shengliang Guan 已提交
104
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
105

S
Shengliang Guan 已提交
106 107 108 109 110
  SConnObj connObj = {.id = connId,
                      .appStartTimeMs = startTime,
                      .pid = pid,
                      .ip = pInfo->clientIp,
                      .port = pInfo->clientPort,
S
Shengliang Guan 已提交
111
                      .killed = 0,
S
Shengliang Guan 已提交
112 113
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
S
Shengliang Guan 已提交
114 115 116 117
                      .queryId = 0,
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
118 119
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
  tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
120 121
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
122
  int32_t   keepTime = pMnode->cfg.shellActivityTimer * 3;
S
Shengliang Guan 已提交
123
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
124 125
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
126
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr());
S
Shengliang Guan 已提交
127 128
    return NULL;
  } else {
S
Shengliang Guan 已提交
129
    mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user);
S
Shengliang Guan 已提交
130 131
    return pConn;
  }
S
Shengliang Guan 已提交
132 133 134 135
}

static void mndFreeConn(SConnObj *pConn) {
  tfree(pConn->pQueries);
S
Shengliang Guan 已提交
136
  mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
137 138
}

S
Shengliang Guan 已提交
139
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
S
Shengliang Guan 已提交
140 141 142 143
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
144
    mDebug("conn:%d, already destroyed", connId);
S
Shengliang Guan 已提交
145 146 147
    return NULL;
  }

S
Shengliang Guan 已提交
148
  int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
S
Shengliang Guan 已提交
149
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
150

S
Shengliang Guan 已提交
151
  mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
152 153 154 155 156
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
S
Shengliang Guan 已提交
157
  mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
158

S
Shengliang Guan 已提交
159
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  *pConn = NULL;

  pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter);
  if (pIter == NULL) return NULL;

  SCacheDataNode **pNode = pIter;
  if (pNode == NULL || *pNode == NULL) {
    taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
    return NULL;
  }

  *pConn = (SConnObj *)((*pNode)->data);
  return pIter;
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}

S
Shengliang Guan 已提交
186
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
187 188 189 190 191 192
  SMnode   *pMnode = pReq->pMnode;
  SUserObj *pUser = NULL;
  SDbObj   *pDb = NULL;
  SConnObj *pConn = NULL;
  int32_t   code = -1;

S
Shengliang Guan 已提交
193 194 195
  SConnectReq *pConnReq = pReq->rpcMsg.pCont;
  pConnReq->pid = htonl(pConnReq->pid);
  pConnReq->startTime = htobe64(pConnReq->startTime);
S
Shengliang Guan 已提交
196 197

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
198 199
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
200
    goto CONN_OVER;
S
Shengliang Guan 已提交
201 202 203 204 205
  }

  char ip[30];
  taosIp2String(info.clientIp, ip);

206 207 208 209 210 211
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }

S
Shengliang Guan 已提交
212
  if (pConnReq->db[0]) {
213 214
    snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db);
    pDb = mndAcquireDb(pMnode, pReq->db);
S
Shengliang Guan 已提交
215 216
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
217
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
218
      goto CONN_OVER;
S
Shengliang Guan 已提交
219 220 221
    }
  }

222
  pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
S
Shengliang Guan 已提交
223
  if (pConn == NULL) {
S
Shengliang Guan 已提交
224
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
225
    goto CONN_OVER;
S
Shengliang Guan 已提交
226 227 228 229 230
  }

  SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
231
    mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
232
    goto CONN_OVER;
S
Shengliang Guan 已提交
233 234
  }

235
  pRsp->acctId    = htonl(pUser->acctId);
236
  pRsp->superUser = pUser->superUser;
237
  pRsp->clusterId = htobe64(pMnode->clusterId);
238 239 240
  pRsp->connId    = htonl(pConn->id);

  snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo);
S
Shengliang Guan 已提交
241 242
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);

S
Shengliang Guan 已提交
243 244
  pReq->contLen = sizeof(SConnectRsp);
  pReq->pCont = pRsp;
245

S
Shengliang Guan 已提交
246
  mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
247 248 249 250 251 252 253 254 255 256

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
257 258
}

S
Shengliang Guan 已提交
259
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
S
Shengliang Guan 已提交
260
  pConn->numOfQueries = 0;
S
Shengliang Guan 已提交
261
  int32_t numOfQueries = htonl(pReq->numOfQueries);
S
Shengliang Guan 已提交
262 263 264

  if (numOfQueries > 0) {
    if (pConn->pQueries == NULL) {
S
Shengliang Guan 已提交
265
      pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
S
Shengliang Guan 已提交
266 267
    }

dengyihao's avatar
dengyihao 已提交
268
    pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries);
S
Shengliang Guan 已提交
269 270 271

    int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
    if (saveSize > 0 && pConn->pQueries != NULL) {
S
Shengliang Guan 已提交
272
      memcpy(pConn->pQueries, pReq->pData, saveSize);
S
Shengliang Guan 已提交
273 274 275 276 277 278
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
279
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
L
Liu Jicong 已提交
280
#if 0
L
Liu Jicong 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
  SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
    free(pRsp);
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
  void* buf = malloc(tlen);
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
340 341
#endif
  return NULL;
L
Liu Jicong 已提交
342 343
}

S
Shengliang Guan 已提交
344
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
L
Liu Jicong 已提交
345 346 347
  SMnode *pMnode = pReq->pMnode;
  char *batchReqStr = pReq->rpcMsg.pCont;
  SClientHbBatchReq batchReq = {0};
L
Liu Jicong 已提交
348
  tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
L
Liu Jicong 已提交
349 350 351
  SArray *pArray = batchReq.reqs;
  int sz = taosArrayGetSize(pArray);

L
Liu Jicong 已提交
352 353
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
354

L
Liu Jicong 已提交
355 356 357
  for (int i = 0; i < sz; i++) {
    SClientHbReq* pHbReq = taosArrayGet(pArray, i);
    if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
H
Haojun Liao 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
      int32_t kvNum = taosHashGetSize(pHbReq->info);
      if (NULL == pHbReq->info || kvNum <= 0) {
        continue;
      }

      SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};

      void *pIter = taosHashIterate(pHbReq->info, NULL);
      while (pIter != NULL) {
        SKv* kv = pIter;
      
        switch (kv->key) {
          case HEARTBEAT_KEY_DBINFO: {
            void *rspMsg = NULL;
            int32_t rspLen = 0;
            mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen);
            if (rspMsg && rspLen > 0) {
              SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
              taosArrayPush(hbRsp.info, &kv);
            }
            break;
          }
          case HEARTBEAT_KEY_STBINFO: {
            void *rspMsg = NULL;
            int32_t rspLen = 0;
            mndValidateStbInfo(pMnode, (SSTableMetaVersion *)kv->value, kv->valueLen/sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
            if (rspMsg && rspLen > 0) {
              SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
              taosArrayPush(hbRsp.info, &kv);
            }
            break;
          }
          default:
            mError("invalid kv key:%d", kv->key);
            hbRsp.status = TSDB_CODE_MND_APP_ERROR;
            break;
        }
              
        pIter = taosHashIterate(pHbReq->info, pIter);
      }
L
Liu Jicong 已提交
398

H
Haojun Liao 已提交
399
      taosArrayPush(batchRsp.rsps, &hbRsp);
L
Liu Jicong 已提交
400
    } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
L
Liu Jicong 已提交
401 402 403 404 405
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
        free(pRsp);
      }
L
Liu Jicong 已提交
406 407
    }
  }
L
Liu Jicong 已提交
408 409
  taosArrayDestroyEx(pArray, tFreeClientHbReq);

L
Liu Jicong 已提交
410 411
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
  void* buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
412 413
  void* abuf = buf;
  tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
H
Haojun Liao 已提交
414 415 416 417 418 419 420 421 422 423 424 425
  
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
    int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0;
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
      tfree(kv->value);
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
426
  taosArrayDestroy(batchRsp.rsps);
L
Liu Jicong 已提交
427 428
  pReq->contLen = tlen;
  pReq->pCont = buf;
L
Liu Jicong 已提交
429 430
  return 0;

L
Liu Jicong 已提交
431
#if 0
S
Shengliang Guan 已提交
432
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
433 434
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
435 436 437
  SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
  pHeartbeat->connId = htonl(pHeartbeat->connId);
  pHeartbeat->pid = htonl(pHeartbeat->pid);
S
Shengliang Guan 已提交
438 439

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
440 441
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
442 443 444
    return -1;
  }

S
Shengliang Guan 已提交
445
  SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
S
Shengliang Guan 已提交
446
  if (pConn == NULL) {
S
Shengliang Guan 已提交
447
    pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
S
Shengliang Guan 已提交
448
    if (pConn == NULL) {
S
Shengliang Guan 已提交
449
      mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
450 451
      return -1;
    } else {
S
Shengliang Guan 已提交
452
      mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
S
Shengliang Guan 已提交
453
    }
S
Shengliang Guan 已提交
454
  } else if (pConn->killed) {
S
Shengliang Guan 已提交
455
    mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
S
Shengliang Guan 已提交
456
    terrno = TSDB_CODE_MND_INVALID_CONNECTION;
S
Shengliang Guan 已提交
457 458
    return -1;
  } else {
S
Shengliang Guan 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471
    if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
      char oldIpStr[40];
      char newIpStr[40];
      taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
      taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
      mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
             pConn->user, oldIpStr);

      if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
      taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }
S
Shengliang Guan 已提交
472 473 474 475 476 477
  }

  SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
  if (pRsp == NULL) {
    mndReleaseConn(pMnode, pConn);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
478
    mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
479 480 481
    return -1;
  }

S
Shengliang Guan 已提交
482
  mndSaveQueryStreamList(pConn, pHeartbeat);
S
Shengliang Guan 已提交
483 484 485 486 487 488 489 490 491
  if (pConn->killed != 0) {
    pRsp->killConnection = 1;
  }

  if (pConn->queryId != 0) {
    pRsp->queryId = htonl(pConn->queryId);
    pConn->queryId = 0;
  }

S
Shengliang Guan 已提交
492
  pRsp->connId = htonl(pConn->id);
S
Shengliang Guan 已提交
493
  pRsp->totalDnodes = htonl(1);
S
Shengliang Guan 已提交
494
  pRsp->onlineDnodes = htonl(1);
S
Shengliang Guan 已提交
495 496 497
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);
  mndReleaseConn(pMnode, pConn);

S
Shengliang Guan 已提交
498 499
  pReq->contLen = sizeof(SConnectRsp);
  pReq->pCont = pRsp;
S
Shengliang Guan 已提交
500
  return 0;
L
Liu Jicong 已提交
501
#endif
S
Shengliang Guan 已提交
502 503
}

S
Shengliang Guan 已提交
504 505
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
506 507
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
508
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
509
  if (pUser == NULL) return 0;
510
  if (!pUser->superUser) {
511 512 513 514 515 516
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
517
  SKillQueryReq *pKill = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
518 519
  int32_t        connId = htonl(pKill->connId);
  int32_t        queryId = htonl(pKill->queryId);
S
Shengliang Guan 已提交
520
  mInfo("kill query msg is received, queryId:%d", pKill->queryId);
521 522 523

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
524
    mError("connId:%d, failed to kill queryId:%d, conn not exist", connId, queryId);
525 526 527
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
528
    mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pReq->user);
529 530 531 532 533 534
    pConn->queryId = queryId;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
535 536
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
537 538
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
539
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
540
  if (pUser == NULL) return 0;
541
  if (!pUser->superUser) {
542 543 544 545 546 547
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
548
  SKillConnReq *pKill = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
549 550 551
  int32_t       connId = htonl(pKill->connId);

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
552
  if (pConn == NULL) {
S
Shengliang Guan 已提交
553
    mError("connId:%d, failed to kill connection, conn not exist", connId);
554 555 556
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
557
    mInfo("connId:%d, is killed by user:%s", connId, pReq->user);
558 559 560 561 562 563
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
564
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
565
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
566 567
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
568
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
569
  if (pUser == NULL) return 0;
570
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
571
    mndReleaseUser(pMnode, pUser);
572 573
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
574 575 576 577
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
578
  SSchema *pSchema = pMeta->pSchema;
S
Shengliang Guan 已提交
579 580 581 582

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
H
Haojun Liao 已提交
583
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
584 585 586 587 588
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
H
Haojun Liao 已提交
589
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
590 591 592 593 594 595
  cols++;

  // app name
  pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "program");
H
Haojun Liao 已提交
596
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
597 598 599 600 601 602
  cols++;

  // app pid
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
H
Haojun Liao 已提交
603
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
604 605 606 607 608
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
H
Haojun Liao 已提交
609
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
610 611 612 613 614
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "login_time");
H
Haojun Liao 已提交
615
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
616 617 618 619 620
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "last_access");
H
Haojun Liao 已提交
621
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
622 623
  cols++;

H
Haojun Liao 已提交
624
  pMeta->numOfColumns = htonl(cols);
S
Shengliang Guan 已提交
625 626 627 628 629 630 631 632 633
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
H
Haojun Liao 已提交
634
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
635 636 637 638

  return 0;
}

S
Shengliang Guan 已提交
639 640
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
641
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
642
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
643 644 645 646 647
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
648 649
    pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
650 651 652 653

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
654
    *(int32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
655 656 657
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
658
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
659 660 661 662
    cols++;

    // app name
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
663
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
S
Shengliang Guan 已提交
664 665 666 667
    cols++;

    // app pid
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
668
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
669 670 671
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
672
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
S
Shengliang Guan 已提交
673 674 675 676
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
677
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
678 679 680
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
681 682
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
683 684 685 686 687 688 689 690 691 692
    cols++;

    numOfRows++;
  }

  pShow->numOfReads += numOfRows;

  return numOfRows;
}

S
Shengliang Guan 已提交
693
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
694
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
695 696
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
697
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
698
  if (pUser == NULL) return 0;
699
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
700
    mndReleaseUser(pMnode, pUser);
701 702
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
703 704 705 706
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
707
  SSchema *pSchema = pMeta->pSchema;
S
Shengliang Guan 已提交
708

S
Shengliang Guan 已提交
709 710 711
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "queryId");
H
Haojun Liao 已提交
712
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
713 714 715 716 717
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
H
Haojun Liao 已提交
718
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
719 720 721 722 723
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
H
Haojun Liao 已提交
724
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
725 726 727 728 729
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
H
Haojun Liao 已提交
730
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
731 732
  cols++;

S
Shengliang Guan 已提交
733
  pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
S
Shengliang Guan 已提交
734 735
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "qid");
H
Haojun Liao 已提交
736
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
737 738 739 740 741
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "created_time");
H
Haojun Liao 已提交
742
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
743 744 745 746 747
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
  strcpy(pSchema[cols].name, "time");
H
Haojun Liao 已提交
748
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
749 750 751 752 753
  cols++;

  pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql_obj_id");
H
Haojun Liao 已提交
754
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
755 756 757 758 759
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
H
Haojun Liao 已提交
760
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
761 762 763 764 765
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ep");
H
Haojun Liao 已提交
766
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
767 768 769 770 771
  cols++;

  pShow->bytes[cols] = 1;
  pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
  strcpy(pSchema[cols].name, "stable_query");
H
Haojun Liao 已提交
772
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
773 774 775 776 777
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "sub_queries");
H
Haojun Liao 已提交
778
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
779 780 781 782 783
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sub_query_info");
H
Haojun Liao 已提交
784
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
785 786 787 788 789
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
H
Haojun Liao 已提交
790
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
791 792
  cols++;

H
Haojun Liao 已提交
793
  pMeta->numOfColumns = htonl(cols);
S
Shengliang Guan 已提交
794 795 796 797 798 799 800 801 802
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = 1000000;
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
H
Haojun Liao 已提交
803
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
804 805 806 807

  return 0;
}

S
Shengliang Guan 已提交
808 809
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
810
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
811
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
812
  int32_t   cols = 0;
813 814
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
815 816 817
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
818 819
    pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) {
S
Shengliang Guan 已提交
820 821 822 823
      pShow->pIter = pIter;
      break;
    }

S
Shengliang Guan 已提交
824
    if (numOfRows + pConn->numOfQueries >= rows) {
S
Shengliang Guan 已提交
825 826 827 828 829
      mndCancelGetNextConn(pMnode, pIter);
      break;
    }

    pShow->pIter = pIter;
S
Shengliang Guan 已提交
830 831
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
832 833 834
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
835 836 837 838 839
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
840 841 842
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
843
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
844 845 846
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
847
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
S
Shengliang Guan 已提交
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
876
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
S
Shengliang Guan 已提交
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
      cols++;

      numOfRows++;
    }
  }

S
Shengliang Guan 已提交
901
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
902 903 904 905 906 907 908 909
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}