You need to sign in or sign up before continuing.
mndProfile.c 29.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20
#include "mndProfile.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
21
#include "mndStb.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23
#include "tglobal.h"
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25 26 27
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6
S
Shengliang Guan 已提交
28
#define QUERY_SAVE_SIZE 20
S
Shengliang Guan 已提交
29 30

typedef struct {
S
Shengliang Guan 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43
  int32_t     id;
  char        user[TSDB_USER_LEN];
  char        app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t     appStartTimeMs;          // app start time
  int32_t     pid;                     // pid of app that invokes taosc
  uint32_t    ip;
  uint16_t    port;
  int8_t      killed;
  int64_t     loginTimeMs;
  int64_t     lastAccessTimeMs;
  int32_t     queryId;
  int32_t     numOfQueries;
  SQueryDesc *pQueries;
S
Shengliang Guan 已提交
44 45
} SConnObj;

S
Shengliang Guan 已提交
46
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
47
static void      mndFreeConn(SConnObj *pConn);
S
Shengliang Guan 已提交
48
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
S
Shengliang Guan 已提交
49 50 51
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void     *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
52 53 54 55
static int32_t   mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t   mndProcessConnectReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SMnodeMsg *pReq);
S
Shengliang Guan 已提交
56
static int32_t   mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
57
static int32_t   mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
58
static int32_t   mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
59
static int32_t   mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
60 61 62 63 64
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
65
  int32_t connCheckTime = pMnode->cfg.shellActivityTimer * 2;
S
Shengliang Guan 已提交
66 67 68 69 70 71 72
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
73 74 75 76
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
77 78 79 80

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
S
Shengliang Guan 已提交
81 82 83
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
84

S
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
Shengliang Guan 已提交
96
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
97 98 99 100
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
  if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
S
Shengliang Guan 已提交
101
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
102

S
Shengliang Guan 已提交
103 104 105 106 107
  SConnObj connObj = {.id = connId,
                      .appStartTimeMs = startTime,
                      .pid = pid,
                      .ip = pInfo->clientIp,
                      .port = pInfo->clientPort,
S
Shengliang Guan 已提交
108
                      .killed = 0,
S
Shengliang Guan 已提交
109 110
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
S
Shengliang Guan 已提交
111 112 113 114
                      .queryId = 0,
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
115 116
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
  tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
117 118
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
119
  int32_t   keepTime = pMnode->cfg.shellActivityTimer * 3;
S
Shengliang Guan 已提交
120
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
121 122
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
123
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr());
S
Shengliang Guan 已提交
124 125
    return NULL;
  } else {
S
Shengliang Guan 已提交
126
    mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user);
S
Shengliang Guan 已提交
127 128
    return pConn;
  }
S
Shengliang Guan 已提交
129 130 131 132
}

static void mndFreeConn(SConnObj *pConn) {
  tfree(pConn->pQueries);
S
Shengliang Guan 已提交
133
  mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
134 135
}

S
Shengliang Guan 已提交
136
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
S
Shengliang Guan 已提交
137 138 139 140
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
141
    mDebug("conn:%d, already destroyed", connId);
S
Shengliang Guan 已提交
142 143 144
    return NULL;
  }

S
Shengliang Guan 已提交
145
  int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
S
Shengliang Guan 已提交
146
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
149 150 151 152 153
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
S
Shengliang Guan 已提交
154
  mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
155

S
Shengliang Guan 已提交
156
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  *pConn = NULL;

  pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter);
  if (pIter == NULL) return NULL;

  SCacheDataNode **pNode = pIter;
  if (pNode == NULL || *pNode == NULL) {
    taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
    return NULL;
  }

  *pConn = (SConnObj *)((*pNode)->data);
  return pIter;
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}

S
Shengliang Guan 已提交
183
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
S
Shengliang Guan 已提交
184 185 186 187 188 189 190 191 192 193 194
  SMnode     *pMnode = pReq->pMnode;
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};

  if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
195 196

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
197 198
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
199
    goto CONN_OVER;
S
Shengliang Guan 已提交
200 201 202 203 204
  }

  char ip[30];
  taosIp2String(info.clientIp, ip);

205 206 207 208 209 210
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }

S
Shengliang Guan 已提交
211 212
  if (connReq.db[0]) {
    snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
213
    pDb = mndAcquireDb(pMnode, pReq->db);
S
Shengliang Guan 已提交
214 215
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
216
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
217
      goto CONN_OVER;
S
Shengliang Guan 已提交
218 219 220
    }
  }

S
Shengliang Guan 已提交
221
  pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
222
  if (pConn == NULL) {
S
Shengliang Guan 已提交
223
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
224
    goto CONN_OVER;
S
Shengliang Guan 已提交
225 226
  }

S
Shengliang Guan 已提交
227 228 229 230 231
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
S
Shengliang Guan 已提交
232

S
Shengliang Guan 已提交
233 234 235
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
236

S
Shengliang Guan 已提交
237 238 239 240 241
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
242

S
Shengliang Guan 已提交
243
  pReq->contLen = contLen;
S
Shengliang Guan 已提交
244
  pReq->pCont = pRsp;
245

S
Shengliang Guan 已提交
246
  mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app);
247 248 249 250 251 252 253 254 255 256

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
257 258
}

S
Shengliang Guan 已提交
259
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
S
Shengliang Guan 已提交
260
  pConn->numOfQueries = 0;
S
Shengliang Guan 已提交
261
  int32_t numOfQueries = htonl(pReq->numOfQueries);
S
Shengliang Guan 已提交
262 263 264

  if (numOfQueries > 0) {
    if (pConn->pQueries == NULL) {
S
Shengliang Guan 已提交
265
      pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
S
Shengliang Guan 已提交
266 267
    }

dengyihao's avatar
dengyihao 已提交
268
    pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries);
S
Shengliang Guan 已提交
269 270 271

    int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
    if (saveSize > 0 && pConn->pQueries != NULL) {
S
Shengliang Guan 已提交
272
      memcpy(pConn->pQueries, pReq->pData, saveSize);
S
Shengliang Guan 已提交
273 274 275 276 277 278
    }
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
279
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
280
#if 0
L
Liu Jicong 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
  SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
    free(pRsp);
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
  void* buf = malloc(tlen);
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
340 341
#endif
  return NULL;
L
Liu Jicong 已提交
342 343
}

S
Shengliang Guan 已提交
344
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
L
Liu Jicong 已提交
345
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
346

L
Liu Jicong 已提交
347
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
348 349 350 351 352
  if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

S
Shengliang Guan 已提交
353
  
L
Liu Jicong 已提交
354 355
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
356

S
Shengliang Guan 已提交
357
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
358
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
359
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
L
Liu Jicong 已提交
360
    if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
D
dapan1121 已提交
361 362 363 364
      int32_t kvNum = taosHashGetSize(pHbReq->info);
      if (NULL == pHbReq->info || kvNum <= 0) {
        continue;
      }
L
Liu Jicong 已提交
365

D
dapan1121 已提交
366 367 368 369
      SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};

      void *pIter = taosHashIterate(pHbReq->info, NULL);
      while (pIter != NULL) {
S
Shengliang Guan 已提交
370 371
        SKv *kv = pIter;

D
dapan1121 已提交
372
        switch (kv->key) {
D
dapan1121 已提交
373
          case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
374
            void   *rspMsg = NULL;
D
dapan1121 已提交
375
            int32_t rspLen = 0;
S
Shengliang Guan 已提交
376
            mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
377 378
            if (rspMsg && rspLen > 0) {
              SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
D
dapan1121 已提交
379
              taosArrayPush(hbRsp.info, &kv);
D
dapan1121 已提交
380 381
            }
            break;
D
dapan1121 已提交
382
          }
D
dapan 已提交
383
          case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
384
            void   *rspMsg = NULL;
D
dapan 已提交
385
            int32_t rspLen = 0;
S
Shengliang Guan 已提交
386
            mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
D
dapan 已提交
387 388 389 390
            if (rspMsg && rspLen > 0) {
              SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
              taosArrayPush(hbRsp.info, &kv);
            }
D
dapan1121 已提交
391
            break;
D
dapan 已提交
392
          }
D
dapan1121 已提交
393 394
          default:
            mError("invalid kv key:%d", kv->key);
D
dapan1121 已提交
395
            hbRsp.status = TSDB_CODE_MND_APP_ERROR;
D
dapan1121 已提交
396 397
            break;
        }
S
Shengliang Guan 已提交
398

D
dapan1121 已提交
399 400
        pIter = taosHashIterate(pHbReq->info, pIter);
      }
D
dapan1121 已提交
401 402

      taosArrayPush(batchRsp.rsps, &hbRsp);
L
Liu Jicong 已提交
403
    } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
L
Liu Jicong 已提交
404 405 406 407 408
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
        free(pRsp);
      }
L
Liu Jicong 已提交
409 410
    }
  }
S
Shengliang Guan 已提交
411
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
412

S
Shengliang Guan 已提交
413 414 415 416
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
417 418 419
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
S
Shengliang Guan 已提交
420
    int32_t       kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
D
dapan1121 已提交
421 422 423 424 425 426 427
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
      tfree(kv->value);
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
428
  taosArrayDestroy(batchRsp.rsps);
L
Liu Jicong 已提交
429 430
  pReq->contLen = tlen;
  pReq->pCont = buf;
L
Liu Jicong 已提交
431 432
  return 0;

L
Liu Jicong 已提交
433
#if 0
S
Shengliang Guan 已提交
434
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
435 436
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
437 438 439
  SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
  pHeartbeat->connId = htonl(pHeartbeat->connId);
  pHeartbeat->pid = htonl(pHeartbeat->pid);
S
Shengliang Guan 已提交
440 441

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
442 443
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
444 445 446
    return -1;
  }

S
Shengliang Guan 已提交
447
  SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
S
Shengliang Guan 已提交
448
  if (pConn == NULL) {
S
Shengliang Guan 已提交
449
    pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
S
Shengliang Guan 已提交
450
    if (pConn == NULL) {
S
Shengliang Guan 已提交
451
      mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
452 453
      return -1;
    } else {
S
Shengliang Guan 已提交
454
      mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
S
Shengliang Guan 已提交
455
    }
S
Shengliang Guan 已提交
456
  } else if (pConn->killed) {
S
Shengliang Guan 已提交
457
    mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
S
Shengliang Guan 已提交
458
    terrno = TSDB_CODE_MND_INVALID_CONNECTION;
S
Shengliang Guan 已提交
459 460
    return -1;
  } else {
S
Shengliang Guan 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473
    if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
      char oldIpStr[40];
      char newIpStr[40];
      taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
      taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
      mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
             pConn->user, oldIpStr);

      if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
      taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }
S
Shengliang Guan 已提交
474 475 476 477 478 479
  }

  SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
  if (pRsp == NULL) {
    mndReleaseConn(pMnode, pConn);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
480
    mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
481 482 483
    return -1;
  }

S
Shengliang Guan 已提交
484
  mndSaveQueryStreamList(pConn, pHeartbeat);
S
Shengliang Guan 已提交
485 486 487 488 489 490 491 492 493
  if (pConn->killed != 0) {
    pRsp->killConnection = 1;
  }

  if (pConn->queryId != 0) {
    pRsp->queryId = htonl(pConn->queryId);
    pConn->queryId = 0;
  }

S
Shengliang Guan 已提交
494
  pRsp->connId = htonl(pConn->id);
S
Shengliang Guan 已提交
495
  pRsp->totalDnodes = htonl(1);
S
Shengliang Guan 已提交
496
  pRsp->onlineDnodes = htonl(1);
S
Shengliang Guan 已提交
497 498 499
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);
  mndReleaseConn(pMnode, pConn);

S
Shengliang Guan 已提交
500 501
  pReq->contLen = sizeof(SConnectRsp);
  pReq->pCont = pRsp;
S
Shengliang Guan 已提交
502
  return 0;
L
Liu Jicong 已提交
503
#endif
S
Shengliang Guan 已提交
504 505
}

S
Shengliang Guan 已提交
506 507
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
508 509
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
510
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
511
  if (pUser == NULL) return 0;
512
  if (!pUser->superUser) {
513 514 515 516 517 518
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
519
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
520
  if (tDeserializeSKillQueryReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
521 522 523 524 525
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
526

S
Shengliang Guan 已提交
527
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
528
  if (pConn == NULL) {
S
Shengliang Guan 已提交
529
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
530 531 532
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
533 534
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user);
    pConn->queryId = killReq.queryId;
535 536 537 538 539
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
540 541
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
542 543
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
544
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
545
  if (pUser == NULL) return 0;
546
  if (!pUser->superUser) {
547 548 549 550 551 552
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
553
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
554
  if (tDeserializeSKillConnReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
555 556 557
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
558

S
Shengliang Guan 已提交
559
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
560
  if (pConn == NULL) {
S
Shengliang Guan 已提交
561
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
562 563 564
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
565
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->user);
566 567 568 569 570 571
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
572
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
573
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
574 575
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
576
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
577
  if (pUser == NULL) return 0;
578
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
579
    mndReleaseUser(pMnode, pUser);
580 581
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
582 583 584 585
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
586
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
587 588 589 590

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
591
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
592 593 594 595 596
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
597
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
598 599 600 601 602 603
  cols++;

  // app name
  pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "program");
S
Shengliang Guan 已提交
604
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
605 606 607 608 609 610
  cols++;

  // app pid
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
611
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
612 613 614 615 616
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
617
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
618 619 620 621 622
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "login_time");
S
Shengliang Guan 已提交
623
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
624 625 626 627 628
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "last_access");
S
Shengliang Guan 已提交
629
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
630 631
  cols++;

S
Shengliang Guan 已提交
632
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
633 634 635 636 637 638 639 640 641
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
642
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
643 644 645 646

  return 0;
}

S
Shengliang Guan 已提交
647 648
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
649
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
650
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
651 652 653 654 655
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
656 657
    pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
658 659 660 661

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
662
    *(int32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
663 664 665
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
666
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
667 668 669 670
    cols++;

    // app name
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
671
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
S
Shengliang Guan 已提交
672 673 674 675
    cols++;

    // app pid
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
676
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
677 678 679
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
680
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
S
Shengliang Guan 已提交
681 682 683 684
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
685
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
686 687 688
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
689 690
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
691 692 693 694 695 696 697 698 699 700
    cols++;

    numOfRows++;
  }

  pShow->numOfReads += numOfRows;

  return numOfRows;
}

S
Shengliang Guan 已提交
701
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
702
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
703 704
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
705
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
706
  if (pUser == NULL) return 0;
707
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
708
    mndReleaseUser(pMnode, pUser);
709 710
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
711 712 713 714
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
715
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
716

S
Shengliang Guan 已提交
717 718 719
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "queryId");
S
Shengliang Guan 已提交
720
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
721 722 723 724 725
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
726
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
727 728 729 730 731
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
732
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
733 734 735 736 737
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
738
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
739 740
  cols++;

S
Shengliang Guan 已提交
741
  pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
S
Shengliang Guan 已提交
742 743
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "qid");
S
Shengliang Guan 已提交
744
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
745 746 747 748 749
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "created_time");
S
Shengliang Guan 已提交
750
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
751 752 753 754 755
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
  strcpy(pSchema[cols].name, "time");
S
Shengliang Guan 已提交
756
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
757 758 759 760 761
  cols++;

  pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql_obj_id");
S
Shengliang Guan 已提交
762
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
763 764 765 766 767
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
768
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
769 770 771 772 773
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ep");
S
Shengliang Guan 已提交
774
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
775 776 777 778 779
  cols++;

  pShow->bytes[cols] = 1;
  pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
  strcpy(pSchema[cols].name, "stable_query");
S
Shengliang Guan 已提交
780
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
781 782 783 784 785
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "sub_queries");
S
Shengliang Guan 已提交
786
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
787 788 789 790 791
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sub_query_info");
S
Shengliang Guan 已提交
792
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
793 794 795 796 797
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
S
Shengliang Guan 已提交
798
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
799 800
  cols++;

S
Shengliang Guan 已提交
801
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
802 803 804 805 806 807 808 809 810
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = 1000000;
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
811
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
812 813 814 815

  return 0;
}

S
Shengliang Guan 已提交
816 817
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
818
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
819
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
820
  int32_t   cols = 0;
821 822
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
823 824 825
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
826 827
    pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) {
S
Shengliang Guan 已提交
828 829 830 831
      pShow->pIter = pIter;
      break;
    }

S
Shengliang Guan 已提交
832
    if (numOfRows + pConn->numOfQueries >= rows) {
S
Shengliang Guan 已提交
833 834 835 836 837
      mndCancelGetNextConn(pMnode, pIter);
      break;
    }

    pShow->pIter = pIter;
S
Shengliang Guan 已提交
838 839
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
840 841 842
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
843 844 845 846 847
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
848 849 850
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
851
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
852 853 854
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
855
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
S
Shengliang Guan 已提交
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
884
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
S
Shengliang Guan 已提交
885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
      cols++;

      numOfRows++;
    }
  }

S
Shengliang Guan 已提交
909
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
910 911 912 913 914 915 916 917
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}