mndProfile.c 29.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20
#include "mndProfile.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
21
#include "mndStb.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
24
#include "version.h"
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26 27 28
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6
S
Shengliang Guan 已提交
29
#define QUERY_SAVE_SIZE 20
S
Shengliang Guan 已提交
30 31

typedef struct {
S
Shengliang Guan 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44
  int32_t     id;
  char        user[TSDB_USER_LEN];
  char        app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t     appStartTimeMs;          // app start time
  int32_t     pid;                     // pid of app that invokes taosc
  uint32_t    ip;
  uint16_t    port;
  int8_t      killed;
  int64_t     loginTimeMs;
  int64_t     lastAccessTimeMs;
  int32_t     queryId;
  int32_t     numOfQueries;
  SQueryDesc *pQueries;
S
Shengliang Guan 已提交
45 46
} SConnObj;

S
Shengliang Guan 已提交
47
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
48
static void      mndFreeConn(SConnObj *pConn);
S
Shengliang Guan 已提交
49
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
S
Shengliang Guan 已提交
50 51 52
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void     *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
53 54 55 56
static int32_t   mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t   mndProcessConnectReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SMnodeMsg *pReq);
S
Shengliang Guan 已提交
57
static int32_t   mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
58
static int32_t   mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
59
static int32_t   mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
60
static int32_t   mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
61 62 63 64 65
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
66
  int32_t connCheckTime = tsShellActivityTimer * 2;
S
Shengliang Guan 已提交
67 68 69 70 71 72 73
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
74 75 76 77
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
78 79 80 81

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
S
Shengliang Guan 已提交
82 83 84
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
85

S
Shengliang Guan 已提交
86 87 88 89 90 91 92 93 94 95 96
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
Shengliang Guan 已提交
97
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
98 99 100 101
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
  if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
S
Shengliang Guan 已提交
102
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
103

S
Shengliang Guan 已提交
104 105 106 107 108
  SConnObj connObj = {.id = connId,
                      .appStartTimeMs = startTime,
                      .pid = pid,
                      .ip = pInfo->clientIp,
                      .port = pInfo->clientPort,
S
Shengliang Guan 已提交
109
                      .killed = 0,
S
Shengliang Guan 已提交
110 111
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
S
Shengliang Guan 已提交
112 113 114 115
                      .queryId = 0,
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
116 117
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
  tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
118 119
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
120
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
121
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
122 123
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
124
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr());
S
Shengliang Guan 已提交
125 126
    return NULL;
  } else {
S
Shengliang Guan 已提交
127
    mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user);
S
Shengliang Guan 已提交
128 129
    return pConn;
  }
S
Shengliang Guan 已提交
130 131 132 133
}

static void mndFreeConn(SConnObj *pConn) {
  tfree(pConn->pQueries);
S
Shengliang Guan 已提交
134
  mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
135 136
}

S
Shengliang Guan 已提交
137
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
S
Shengliang Guan 已提交
138 139 140 141
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
142
    mDebug("conn:%d, already destroyed", connId);
S
Shengliang Guan 已提交
143 144 145
    return NULL;
  }

S
Shengliang Guan 已提交
146
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
147
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
148

S
Shengliang Guan 已提交
149
  mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
150 151 152 153 154
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
S
Shengliang Guan 已提交
155
  mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  *pConn = NULL;

  pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter);
  if (pIter == NULL) return NULL;

  SCacheDataNode **pNode = pIter;
  if (pNode == NULL || *pNode == NULL) {
    taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
    return NULL;
  }

  *pConn = (SConnObj *)((*pNode)->data);
  return pIter;
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}

S
Shengliang Guan 已提交
184
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
S
Shengliang Guan 已提交
185 186 187 188 189 190 191 192 193 194 195
  SMnode     *pMnode = pReq->pMnode;
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};

  if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
196 197

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
198 199
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
200
    goto CONN_OVER;
S
Shengliang Guan 已提交
201 202 203 204 205
  }

  char ip[30];
  taosIp2String(info.clientIp, ip);

206 207 208 209 210 211
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }

S
Shengliang Guan 已提交
212 213
  if (connReq.db[0]) {
    snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
214
    pDb = mndAcquireDb(pMnode, pReq->db);
S
Shengliang Guan 已提交
215 216
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
217
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
218
      goto CONN_OVER;
S
Shengliang Guan 已提交
219 220 221
    }
  }

S
Shengliang Guan 已提交
222
  pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
223
  if (pConn == NULL) {
S
Shengliang Guan 已提交
224
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
225
    goto CONN_OVER;
S
Shengliang Guan 已提交
226 227
  }

S
Shengliang Guan 已提交
228 229 230 231 232
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
S
Shengliang Guan 已提交
233

S
Shengliang Guan 已提交
234 235 236
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
237

S
Shengliang Guan 已提交
238 239 240 241 242
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
243

S
Shengliang Guan 已提交
244
  pReq->contLen = contLen;
S
Shengliang Guan 已提交
245
  pReq->pCont = pRsp;
246

S
Shengliang Guan 已提交
247
  mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app);
248 249 250 251 252 253 254 255 256 257

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
258 259
}

S
Shengliang Guan 已提交
260
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
S
Shengliang Guan 已提交
261
  pConn->numOfQueries = 0;
S
Shengliang Guan 已提交
262
  int32_t numOfQueries = htonl(pReq->numOfQueries);
S
Shengliang Guan 已提交
263 264 265

  if (numOfQueries > 0) {
    if (pConn->pQueries == NULL) {
S
Shengliang Guan 已提交
266
      pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
S
Shengliang Guan 已提交
267 268
    }

dengyihao's avatar
dengyihao 已提交
269
    pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries);
S
Shengliang Guan 已提交
270 271 272

    int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
    if (saveSize > 0 && pConn->pQueries != NULL) {
S
Shengliang Guan 已提交
273
      memcpy(pConn->pQueries, pReq->pData, saveSize);
S
Shengliang Guan 已提交
274 275 276 277 278 279
    }
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
280
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
281
#if 0
L
Liu Jicong 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
  SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
    free(pRsp);
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
  void* buf = malloc(tlen);
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
341 342
#endif
  return NULL;
L
Liu Jicong 已提交
343 344
}

S
Shengliang Guan 已提交
345
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
L
Liu Jicong 已提交
346
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
347

L
Liu Jicong 已提交
348
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
349 350 351 352 353
  if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

S
Shengliang Guan 已提交
354
  
L
Liu Jicong 已提交
355 356
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
357

S
Shengliang Guan 已提交
358
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
359
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
360
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
L
Liu Jicong 已提交
361
    if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
D
dapan1121 已提交
362 363 364 365
      int32_t kvNum = taosHashGetSize(pHbReq->info);
      if (NULL == pHbReq->info || kvNum <= 0) {
        continue;
      }
L
Liu Jicong 已提交
366

D
dapan1121 已提交
367 368 369 370
      SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};

      void *pIter = taosHashIterate(pHbReq->info, NULL);
      while (pIter != NULL) {
S
Shengliang Guan 已提交
371 372
        SKv *kv = pIter;

D
dapan1121 已提交
373
        switch (kv->key) {
D
dapan1121 已提交
374
          case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
375
            void   *rspMsg = NULL;
D
dapan1121 已提交
376
            int32_t rspLen = 0;
S
Shengliang Guan 已提交
377
            mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
378 379
            if (rspMsg && rspLen > 0) {
              SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
D
dapan1121 已提交
380
              taosArrayPush(hbRsp.info, &kv);
D
dapan1121 已提交
381 382
            }
            break;
D
dapan1121 已提交
383
          }
D
dapan 已提交
384
          case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
385
            void   *rspMsg = NULL;
D
dapan 已提交
386
            int32_t rspLen = 0;
S
Shengliang Guan 已提交
387
            mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
D
dapan 已提交
388 389 390 391
            if (rspMsg && rspLen > 0) {
              SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
              taosArrayPush(hbRsp.info, &kv);
            }
D
dapan1121 已提交
392
            break;
D
dapan 已提交
393
          }
D
dapan1121 已提交
394 395
          default:
            mError("invalid kv key:%d", kv->key);
D
dapan1121 已提交
396
            hbRsp.status = TSDB_CODE_MND_APP_ERROR;
D
dapan1121 已提交
397 398
            break;
        }
S
Shengliang Guan 已提交
399

D
dapan1121 已提交
400 401
        pIter = taosHashIterate(pHbReq->info, pIter);
      }
D
dapan1121 已提交
402 403

      taosArrayPush(batchRsp.rsps, &hbRsp);
L
Liu Jicong 已提交
404
    } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
L
Liu Jicong 已提交
405 406 407 408 409
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
        free(pRsp);
      }
L
Liu Jicong 已提交
410 411
    }
  }
S
Shengliang Guan 已提交
412
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
413

S
Shengliang Guan 已提交
414 415 416 417
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
418 419 420
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
S
Shengliang Guan 已提交
421
    int32_t       kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
D
dapan1121 已提交
422 423 424 425 426 427 428
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
      tfree(kv->value);
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
429
  taosArrayDestroy(batchRsp.rsps);
L
Liu Jicong 已提交
430 431
  pReq->contLen = tlen;
  pReq->pCont = buf;
L
Liu Jicong 已提交
432 433
  return 0;

L
Liu Jicong 已提交
434
#if 0
S
Shengliang Guan 已提交
435
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
436 437
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
438 439 440
  SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
  pHeartbeat->connId = htonl(pHeartbeat->connId);
  pHeartbeat->pid = htonl(pHeartbeat->pid);
S
Shengliang Guan 已提交
441 442

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
443 444
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
445 446 447
    return -1;
  }

S
Shengliang Guan 已提交
448
  SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
S
Shengliang Guan 已提交
449
  if (pConn == NULL) {
S
Shengliang Guan 已提交
450
    pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
S
Shengliang Guan 已提交
451
    if (pConn == NULL) {
S
Shengliang Guan 已提交
452
      mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
453 454
      return -1;
    } else {
S
Shengliang Guan 已提交
455
      mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
S
Shengliang Guan 已提交
456
    }
S
Shengliang Guan 已提交
457
  } else if (pConn->killed) {
S
Shengliang Guan 已提交
458
    mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
S
Shengliang Guan 已提交
459
    terrno = TSDB_CODE_MND_INVALID_CONNECTION;
S
Shengliang Guan 已提交
460 461
    return -1;
  } else {
S
Shengliang Guan 已提交
462 463 464 465 466 467 468 469 470 471 472 473 474
    if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
      char oldIpStr[40];
      char newIpStr[40];
      taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
      taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
      mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
             pConn->user, oldIpStr);

      if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
      taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }
S
Shengliang Guan 已提交
475 476 477 478 479 480
  }

  SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
  if (pRsp == NULL) {
    mndReleaseConn(pMnode, pConn);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
481
    mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
482 483 484
    return -1;
  }

S
Shengliang Guan 已提交
485
  mndSaveQueryStreamList(pConn, pHeartbeat);
S
Shengliang Guan 已提交
486 487 488 489 490 491 492 493 494
  if (pConn->killed != 0) {
    pRsp->killConnection = 1;
  }

  if (pConn->queryId != 0) {
    pRsp->queryId = htonl(pConn->queryId);
    pConn->queryId = 0;
  }

S
Shengliang Guan 已提交
495
  pRsp->connId = htonl(pConn->id);
S
Shengliang Guan 已提交
496
  pRsp->totalDnodes = htonl(1);
S
Shengliang Guan 已提交
497
  pRsp->onlineDnodes = htonl(1);
S
Shengliang Guan 已提交
498 499 500
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);
  mndReleaseConn(pMnode, pConn);

S
Shengliang Guan 已提交
501 502
  pReq->contLen = sizeof(SConnectRsp);
  pReq->pCont = pRsp;
S
Shengliang Guan 已提交
503
  return 0;
L
Liu Jicong 已提交
504
#endif
S
Shengliang Guan 已提交
505 506
}

S
Shengliang Guan 已提交
507 508
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
509 510
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
511
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
512
  if (pUser == NULL) return 0;
513
  if (!pUser->superUser) {
514 515 516 517 518 519
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
520
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
521
  if (tDeserializeSKillQueryReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
522 523 524 525 526
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
527

S
Shengliang Guan 已提交
528
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
529
  if (pConn == NULL) {
S
Shengliang Guan 已提交
530
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
531 532 533
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
534 535
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user);
    pConn->queryId = killReq.queryId;
536 537 538 539 540
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
541 542
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
543 544
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
545
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
546
  if (pUser == NULL) return 0;
547
  if (!pUser->superUser) {
548 549 550 551 552 553
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
554
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
555
  if (tDeserializeSKillConnReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
556 557 558
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
559

S
Shengliang Guan 已提交
560
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
561
  if (pConn == NULL) {
S
Shengliang Guan 已提交
562
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
563 564 565
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
566
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->user);
567 568 569 570 571 572
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
573
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
574
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
575 576
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
577
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
578
  if (pUser == NULL) return 0;
579
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
580
    mndReleaseUser(pMnode, pUser);
581 582
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
583 584 585 586
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
587
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
588 589 590 591

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
592
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
593 594 595 596 597
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
598
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
599 600 601 602 603 604
  cols++;

  // app name
  pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "program");
S
Shengliang Guan 已提交
605
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
606 607 608 609 610 611
  cols++;

  // app pid
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
612
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
613 614 615 616 617
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
618
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
619 620 621 622 623
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "login_time");
S
Shengliang Guan 已提交
624
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
625 626 627 628 629
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "last_access");
S
Shengliang Guan 已提交
630
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
631 632
  cols++;

S
Shengliang Guan 已提交
633
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
634 635 636 637 638 639 640 641 642
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
643
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
644 645 646 647

  return 0;
}

S
Shengliang Guan 已提交
648 649
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
650
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
651
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
652 653 654 655 656
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
657 658
    pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
659 660 661 662

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
663
    *(int32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
664 665 666
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
667
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
668 669 670 671
    cols++;

    // app name
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
672
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
S
Shengliang Guan 已提交
673 674 675 676
    cols++;

    // app pid
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
677
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
678 679 680
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
681
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
S
Shengliang Guan 已提交
682 683 684 685
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
686
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
687 688 689
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
690 691
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
692 693 694 695 696 697 698 699 700 701
    cols++;

    numOfRows++;
  }

  pShow->numOfReads += numOfRows;

  return numOfRows;
}

S
Shengliang Guan 已提交
702
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
703
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
704 705
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
706
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
707
  if (pUser == NULL) return 0;
708
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
709
    mndReleaseUser(pMnode, pUser);
710 711
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
712 713 714 715
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
716
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
717

S
Shengliang Guan 已提交
718 719 720
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "queryId");
S
Shengliang Guan 已提交
721
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
722 723 724 725 726
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
727
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
728 729 730 731 732
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
733
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
734 735 736 737 738
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
739
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
740 741
  cols++;

S
Shengliang Guan 已提交
742
  pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
S
Shengliang Guan 已提交
743 744
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "qid");
S
Shengliang Guan 已提交
745
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
746 747 748 749 750
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "created_time");
S
Shengliang Guan 已提交
751
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
752 753 754 755 756
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
  strcpy(pSchema[cols].name, "time");
S
Shengliang Guan 已提交
757
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
758 759 760 761 762
  cols++;

  pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql_obj_id");
S
Shengliang Guan 已提交
763
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
764 765 766 767 768
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
769
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
770 771 772 773 774
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ep");
S
Shengliang Guan 已提交
775
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
776 777 778 779 780
  cols++;

  pShow->bytes[cols] = 1;
  pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
  strcpy(pSchema[cols].name, "stable_query");
S
Shengliang Guan 已提交
781
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
782 783 784 785 786
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "sub_queries");
S
Shengliang Guan 已提交
787
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
788 789 790 791 792
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sub_query_info");
S
Shengliang Guan 已提交
793
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
794 795 796 797 798
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
S
Shengliang Guan 已提交
799
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
800 801
  cols++;

S
Shengliang Guan 已提交
802
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
803 804 805 806 807 808 809 810 811
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = 1000000;
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
812
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
813 814 815 816

  return 0;
}

S
Shengliang Guan 已提交
817 818
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
819
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
820
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
821
  int32_t   cols = 0;
822 823
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
824 825 826
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
827 828
    pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) {
S
Shengliang Guan 已提交
829 830 831 832
      pShow->pIter = pIter;
      break;
    }

S
Shengliang Guan 已提交
833
    if (numOfRows + pConn->numOfQueries >= rows) {
S
Shengliang Guan 已提交
834 835 836 837 838
      mndCancelGetNextConn(pMnode, pIter);
      break;
    }

    pShow->pIter = pIter;
S
Shengliang Guan 已提交
839 840
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
841 842 843
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
844 845 846 847 848
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
849 850 851
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
852
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
853 854 855
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
856
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
S
Shengliang Guan 已提交
857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
885
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
S
Shengliang Guan 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
      cols++;

      numOfRows++;
    }
  }

S
Shengliang Guan 已提交
910
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
911 912 913 914 915 916 917 918
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}