mndProfile.c 27.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndProfile.h"
L
Liu Jicong 已提交
18
#include "mndConsumer.h"
S
Shengliang Guan 已提交
19 20 21
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
L
Liu Jicong 已提交
22
#include "mndTopic.h"
S
Shengliang Guan 已提交
23
#include "mndUser.h"
L
Liu Jicong 已提交
24
#include "mndVgroup.h"
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26 27 28
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6
S
Shengliang Guan 已提交
29
#define QUERY_SAVE_SIZE 20
S
Shengliang Guan 已提交
30 31

typedef struct {
S
Shengliang Guan 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44
  int32_t     id;
  char        user[TSDB_USER_LEN];
  char        app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t     appStartTimeMs;          // app start time
  int32_t     pid;                     // pid of app that invokes taosc
  uint32_t    ip;
  uint16_t    port;
  int8_t      killed;
  int64_t     loginTimeMs;
  int64_t     lastAccessTimeMs;
  int32_t     queryId;
  int32_t     numOfQueries;
  SQueryDesc *pQueries;
S
Shengliang Guan 已提交
45 46
} SConnObj;

S
Shengliang Guan 已提交
47
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
48
static void      mndFreeConn(SConnObj *pConn);
S
Shengliang Guan 已提交
49
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
S
Shengliang Guan 已提交
50 51 52
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void     *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
53 54 55 56
static int32_t   mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t   mndProcessConnectReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SMnodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SMnodeMsg *pReq);
S
Shengliang Guan 已提交
57
static int32_t   mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
58
static int32_t   mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
59
static int32_t   mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
60
static int32_t   mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
61 62 63 64 65
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
66
  int32_t connCheckTime = pMnode->cfg.shellActivityTimer * 2;
S
Shengliang Guan 已提交
67 68 69 70 71 72 73
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
74 75 76 77
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
78 79 80 81

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
S
Shengliang Guan 已提交
82 83 84
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
85

S
Shengliang Guan 已提交
86 87 88 89 90 91 92 93 94 95 96
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
Shengliang Guan 已提交
97
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
98 99 100 101
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
  if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
S
Shengliang Guan 已提交
102
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
103

S
Shengliang Guan 已提交
104 105 106 107 108
  SConnObj connObj = {.id = connId,
                      .appStartTimeMs = startTime,
                      .pid = pid,
                      .ip = pInfo->clientIp,
                      .port = pInfo->clientPort,
S
Shengliang Guan 已提交
109
                      .killed = 0,
S
Shengliang Guan 已提交
110 111
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
S
Shengliang Guan 已提交
112 113 114 115
                      .queryId = 0,
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
116 117
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
  tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
118 119
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
120
  int32_t   keepTime = pMnode->cfg.shellActivityTimer * 3;
S
Shengliang Guan 已提交
121
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
122 123
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
124
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr());
S
Shengliang Guan 已提交
125 126
    return NULL;
  } else {
S
Shengliang Guan 已提交
127
    mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user);
S
Shengliang Guan 已提交
128 129
    return pConn;
  }
S
Shengliang Guan 已提交
130 131 132 133
}

static void mndFreeConn(SConnObj *pConn) {
  tfree(pConn->pQueries);
S
Shengliang Guan 已提交
134
  mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
135 136
}

S
Shengliang Guan 已提交
137
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
S
Shengliang Guan 已提交
138 139 140 141
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
142
    mDebug("conn:%d, already destroyed", connId);
S
Shengliang Guan 已提交
143 144 145
    return NULL;
  }

S
Shengliang Guan 已提交
146
  int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
S
Shengliang Guan 已提交
147
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
148

S
Shengliang Guan 已提交
149
  mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
150 151 152 153 154
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
S
Shengliang Guan 已提交
155
  mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  *pConn = NULL;

  pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter);
  if (pIter == NULL) return NULL;

  SCacheDataNode **pNode = pIter;
  if (pNode == NULL || *pNode == NULL) {
    taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
    return NULL;
  }

  *pConn = (SConnObj *)((*pNode)->data);
  return pIter;
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}

S
Shengliang Guan 已提交
184
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
185 186 187 188 189 190
  SMnode   *pMnode = pReq->pMnode;
  SUserObj *pUser = NULL;
  SDbObj   *pDb = NULL;
  SConnObj *pConn = NULL;
  int32_t   code = -1;

S
Shengliang Guan 已提交
191 192 193
  SConnectReq *pConnReq = pReq->rpcMsg.pCont;
  pConnReq->pid = htonl(pConnReq->pid);
  pConnReq->startTime = htobe64(pConnReq->startTime);
S
Shengliang Guan 已提交
194 195

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
196 197
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
198
    goto CONN_OVER;
S
Shengliang Guan 已提交
199 200 201 202 203
  }

  char ip[30];
  taosIp2String(info.clientIp, ip);

204 205 206 207 208 209
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }

S
Shengliang Guan 已提交
210
  if (pConnReq->db[0]) {
211 212
    snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db);
    pDb = mndAcquireDb(pMnode, pReq->db);
S
Shengliang Guan 已提交
213 214
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
215
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
216
      goto CONN_OVER;
S
Shengliang Guan 已提交
217 218 219
    }
  }

220
  pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
S
Shengliang Guan 已提交
221
  if (pConn == NULL) {
S
Shengliang Guan 已提交
222
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
223
    goto CONN_OVER;
S
Shengliang Guan 已提交
224 225 226 227 228
  }

  SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
229
    mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
230
    goto CONN_OVER;
S
Shengliang Guan 已提交
231 232
  }

233 234
  pRsp->acctId = htonl(pUser->acctId);
  pRsp->superUser = pUser->superUser;
235
  pRsp->clusterId = htobe64(pMnode->clusterId);
S
Shengliang Guan 已提交
236
  pRsp->connId = htonl(pConn->id);
S
Shengliang Guan 已提交
237 238
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);

S
Shengliang Guan 已提交
239 240
  pReq->contLen = sizeof(SConnectRsp);
  pReq->pCont = pRsp;
241

S
Shengliang Guan 已提交
242
  mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
243 244 245 246 247 248 249 250 251 252

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
253 254
}

S
Shengliang Guan 已提交
255
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
S
Shengliang Guan 已提交
256
  pConn->numOfQueries = 0;
S
Shengliang Guan 已提交
257
  int32_t numOfQueries = htonl(pReq->numOfQueries);
S
Shengliang Guan 已提交
258 259 260

  if (numOfQueries > 0) {
    if (pConn->pQueries == NULL) {
S
Shengliang Guan 已提交
261
      pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
S
Shengliang Guan 已提交
262 263
    }

S
Shengliang Guan 已提交
264
    pConn->numOfQueries = MIN(QUERY_SAVE_SIZE, numOfQueries);
S
Shengliang Guan 已提交
265 266 267

    int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
    if (saveSize > 0 && pConn->pQueries != NULL) {
S
Shengliang Guan 已提交
268
      memcpy(pConn->pQueries, pReq->pData, saveSize);
S
Shengliang Guan 已提交
269 270 271 272 273 274
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
  SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
    free(pRsp);
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
  void* buf = malloc(tlen);
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
}

S
Shengliang Guan 已提交
337
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
L
Liu Jicong 已提交
338 339 340
  SMnode *pMnode = pReq->pMnode;
  char *batchReqStr = pReq->rpcMsg.pCont;
  SClientHbBatchReq batchReq = {0};
L
Liu Jicong 已提交
341
  tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
L
Liu Jicong 已提交
342 343 344
  SArray *pArray = batchReq.reqs;
  int sz = taosArrayGetSize(pArray);

L
Liu Jicong 已提交
345 346
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
347

L
Liu Jicong 已提交
348 349 350 351 352
  for (int i = 0; i < sz; i++) {
    SClientHbReq* pHbReq = taosArrayGet(pArray, i);
    if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {

    } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
L
Liu Jicong 已提交
353 354 355 356 357
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
        free(pRsp);
      }
L
Liu Jicong 已提交
358 359
    }
  }
L
Liu Jicong 已提交
360 361
  taosArrayDestroyEx(pArray, tFreeClientHbReq);

L
Liu Jicong 已提交
362 363
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
  void* buf = rpcMallocCont(tlen);
L
Liu Jicong 已提交
364 365
  void* abuf = buf;
  tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
L
Liu Jicong 已提交
366
  taosArrayDestroy(batchRsp.rsps);
L
Liu Jicong 已提交
367 368
  pReq->contLen = tlen;
  pReq->pCont = buf;
L
Liu Jicong 已提交
369 370
  return 0;

L
Liu Jicong 已提交
371
#if 0
S
Shengliang Guan 已提交
372
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
373 374
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
375 376 377
  SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
  pHeartbeat->connId = htonl(pHeartbeat->connId);
  pHeartbeat->pid = htonl(pHeartbeat->pid);
S
Shengliang Guan 已提交
378 379

  SRpcConnInfo info = {0};
S
Shengliang Guan 已提交
380 381
  if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
    mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
382 383 384
    return -1;
  }

S
Shengliang Guan 已提交
385
  SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
S
Shengliang Guan 已提交
386
  if (pConn == NULL) {
S
Shengliang Guan 已提交
387
    pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
S
Shengliang Guan 已提交
388
    if (pConn == NULL) {
S
Shengliang Guan 已提交
389
      mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
390 391
      return -1;
    } else {
S
Shengliang Guan 已提交
392
      mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
S
Shengliang Guan 已提交
393
    }
S
Shengliang Guan 已提交
394
  } else if (pConn->killed) {
S
Shengliang Guan 已提交
395
    mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
S
Shengliang Guan 已提交
396
    terrno = TSDB_CODE_MND_INVALID_CONNECTION;
S
Shengliang Guan 已提交
397 398
    return -1;
  } else {
S
Shengliang Guan 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411
    if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
      char oldIpStr[40];
      char newIpStr[40];
      taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
      taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
      mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
             pConn->user, oldIpStr);

      if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
      taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }
S
Shengliang Guan 已提交
412 413 414 415 416 417
  }

  SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
  if (pRsp == NULL) {
    mndReleaseConn(pMnode, pConn);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
418
    mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
419 420 421
    return -1;
  }

S
Shengliang Guan 已提交
422
  mndSaveQueryStreamList(pConn, pHeartbeat);
S
Shengliang Guan 已提交
423 424 425 426 427 428 429 430 431
  if (pConn->killed != 0) {
    pRsp->killConnection = 1;
  }

  if (pConn->queryId != 0) {
    pRsp->queryId = htonl(pConn->queryId);
    pConn->queryId = 0;
  }

S
Shengliang Guan 已提交
432
  pRsp->connId = htonl(pConn->id);
S
Shengliang Guan 已提交
433
  pRsp->totalDnodes = htonl(1);
S
Shengliang Guan 已提交
434
  pRsp->onlineDnodes = htonl(1);
S
Shengliang Guan 已提交
435 436 437
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);
  mndReleaseConn(pMnode, pConn);

S
Shengliang Guan 已提交
438 439
  pReq->contLen = sizeof(SConnectRsp);
  pReq->pCont = pRsp;
S
Shengliang Guan 已提交
440
  return 0;
L
Liu Jicong 已提交
441
#endif
S
Shengliang Guan 已提交
442 443
}

S
Shengliang Guan 已提交
444 445
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
446 447
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
448
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
449
  if (pUser == NULL) return 0;
450
  if (!pUser->superUser) {
451 452 453 454 455 456
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
457
  SKillQueryReq *pKill = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
458 459
  int32_t        connId = htonl(pKill->connId);
  int32_t        queryId = htonl(pKill->queryId);
S
Shengliang Guan 已提交
460
  mInfo("kill query msg is received, queryId:%d", pKill->queryId);
461 462 463

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
464
    mError("connId:%d, failed to kill queryId:%d, conn not exist", connId, queryId);
465 466 467
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
468
    mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pReq->user);
469 470 471 472 473 474
    pConn->queryId = queryId;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
475 476
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
  SMnode       *pMnode = pReq->pMnode;
477 478
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
479
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
480
  if (pUser == NULL) return 0;
481
  if (!pUser->superUser) {
482 483 484 485 486 487
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
488
  SKillConnReq *pKill = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
489 490 491
  int32_t       connId = htonl(pKill->connId);

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
492
  if (pConn == NULL) {
S
Shengliang Guan 已提交
493
    mError("connId:%d, failed to kill connection, conn not exist", connId);
494 495 496
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
497
    mInfo("connId:%d, is killed by user:%s", connId, pReq->user);
498 499 500 501 502 503
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
504
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
505
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
506 507
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
508
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
509
  if (pUser == NULL) return 0;
510
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
511
    mndReleaseUser(pMnode, pUser);
512 513
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
514 515 516 517
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
518
  SSchema *pSchema = pMeta->pSchema;
S
Shengliang Guan 已提交
519 520 521 522

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
H
Haojun Liao 已提交
523
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
524 525 526 527 528
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
H
Haojun Liao 已提交
529
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
530 531 532 533 534 535
  cols++;

  // app name
  pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "program");
H
Haojun Liao 已提交
536
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
537 538 539 540 541 542
  cols++;

  // app pid
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
H
Haojun Liao 已提交
543
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
544 545 546 547 548
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
H
Haojun Liao 已提交
549
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
550 551 552 553 554
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "login_time");
H
Haojun Liao 已提交
555
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
556 557 558 559 560
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "last_access");
H
Haojun Liao 已提交
561
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
562 563
  cols++;

H
Haojun Liao 已提交
564
  pMeta->numOfColumns = htonl(cols);
S
Shengliang Guan 已提交
565 566 567 568 569 570 571 572 573
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
S
Shengliang Guan 已提交
574
  strcpy(pMeta->tbFname, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
575 576 577 578

  return 0;
}

S
Shengliang Guan 已提交
579 580
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
581
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
582
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
583 584 585 586 587
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
588 589
    pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
590 591 592 593

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
594
    *(int32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
595 596 597
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
598
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
599 600 601 602
    cols++;

    // app name
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
603
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
S
Shengliang Guan 已提交
604 605 606 607
    cols++;

    // app pid
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
608
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
609 610 611
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
612
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
S
Shengliang Guan 已提交
613 614 615 616
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
617
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
618 619 620
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
621 622
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
623 624 625 626 627 628 629 630 631 632
    cols++;

    numOfRows++;
  }

  pShow->numOfReads += numOfRows;

  return numOfRows;
}

S
Shengliang Guan 已提交
633
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
634
  SMnode       *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
635 636
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
637
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
638
  if (pUser == NULL) return 0;
639
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
640
    mndReleaseUser(pMnode, pUser);
641 642
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
643 644 645 646
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
647
  SSchema *pSchema = pMeta->pSchema;
S
Shengliang Guan 已提交
648

S
Shengliang Guan 已提交
649 650 651
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "queryId");
H
Haojun Liao 已提交
652
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
653 654 655 656 657
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
H
Haojun Liao 已提交
658
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
659 660 661 662 663
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
H
Haojun Liao 已提交
664
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
665 666 667 668 669
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
H
Haojun Liao 已提交
670
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
671 672
  cols++;

S
Shengliang Guan 已提交
673
  pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
S
Shengliang Guan 已提交
674 675
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "qid");
H
Haojun Liao 已提交
676
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
677 678 679 680 681
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "created_time");
H
Haojun Liao 已提交
682
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
683 684 685 686 687
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
  strcpy(pSchema[cols].name, "time");
H
Haojun Liao 已提交
688
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
689 690 691 692 693
  cols++;

  pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql_obj_id");
H
Haojun Liao 已提交
694
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
695 696 697 698 699
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
H
Haojun Liao 已提交
700
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
701 702 703 704 705
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ep");
H
Haojun Liao 已提交
706
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
707 708 709 710 711
  cols++;

  pShow->bytes[cols] = 1;
  pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
  strcpy(pSchema[cols].name, "stable_query");
H
Haojun Liao 已提交
712
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
713 714 715 716 717
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "sub_queries");
H
Haojun Liao 已提交
718
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
719 720 721 722 723
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sub_query_info");
H
Haojun Liao 已提交
724
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
725 726 727 728 729
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
H
Haojun Liao 已提交
730
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
S
Shengliang Guan 已提交
731 732
  cols++;

H
Haojun Liao 已提交
733
  pMeta->numOfColumns = htonl(cols);
S
Shengliang Guan 已提交
734 735 736 737 738 739 740 741 742
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = 1000000;
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
S
Shengliang Guan 已提交
743
  strcpy(pMeta->tbFname, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
744 745 746 747

  return 0;
}

S
Shengliang Guan 已提交
748 749
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
750
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
751
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
752
  int32_t   cols = 0;
753 754
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
755 756 757
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

  while (numOfRows < rows) {
S
Shengliang Guan 已提交
758 759
    pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
    if (pConn == NULL) {
S
Shengliang Guan 已提交
760 761 762 763
      pShow->pIter = pIter;
      break;
    }

S
Shengliang Guan 已提交
764
    if (numOfRows + pConn->numOfQueries >= rows) {
S
Shengliang Guan 已提交
765 766 767 768 769
      mndCancelGetNextConn(pMnode, pIter);
      break;
    }

    pShow->pIter = pIter;
S
Shengliang Guan 已提交
770 771
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
772 773 774
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
775 776 777 778 779
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
780 781 782
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
783
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
784 785 786
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
787
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
S
Shengliang Guan 已提交
788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
816
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
S
Shengliang Guan 已提交
817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
      cols++;

      numOfRows++;
    }
  }

S
Shengliang Guan 已提交
841
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
842 843 844 845 846 847 848 849
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}