mndProfile.c 29.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20
#include "mndProfile.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
21
#include "mndStb.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
24
#include "version.h"
S
Shengliang Guan 已提交
25

L
Liu Jicong 已提交
26 27
#define QUERY_ID_SIZE      20
#define QUERY_OBJ_ID_SIZE  18
S
Shengliang Guan 已提交
28
#define SUBQUERY_INFO_SIZE 6
L
Liu Jicong 已提交
29
#define QUERY_SAVE_SIZE    20
S
Shengliang Guan 已提交
30 31

typedef struct {
S
Shengliang Guan 已提交
32
  int32_t     id;
L
Liu Jicong 已提交
33
  int8_t      connType;
S
Shengliang Guan 已提交
34 35 36 37 38 39 40 41 42 43 44 45
  char        user[TSDB_USER_LEN];
  char        app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t     appStartTimeMs;          // app start time
  int32_t     pid;                     // pid of app that invokes taosc
  uint32_t    ip;
  uint16_t    port;
  int8_t      killed;
  int64_t     loginTimeMs;
  int64_t     lastAccessTimeMs;
  int32_t     queryId;
  int32_t     numOfQueries;
  SQueryDesc *pQueries;
S
Shengliang Guan 已提交
46 47
} SConnObj;

L
Liu Jicong 已提交
48 49
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
50
static void      mndFreeConn(SConnObj *pConn);
S
Shengliang Guan 已提交
51
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
S
Shengliang Guan 已提交
52
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
H
Haojun Liao 已提交
53
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
54
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
55 56 57 58 59 60 61 62
static int32_t   mndProcessHeartBeatReq(SNodeMsg *pReq);
static int32_t   mndProcessConnectReq(SNodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SNodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SNodeMsg *pReq);
static int32_t   mndGetConnsMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t   mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t   mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t   mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
63 64 65 66 67
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
68
  int32_t connCheckTime = tsShellActivityTimer * 2;
S
Shengliang Guan 已提交
69 70 71 72 73 74 75
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
80 81 82

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
S
Shengliang Guan 已提交
83 84
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
85

S
Shengliang Guan 已提交
86 87 88 89 90 91 92 93 94 95 96
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

L
Liu Jicong 已提交
97 98
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
99 100 101 102
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
  if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
S
Shengliang Guan 已提交
103
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
104

S
Shengliang Guan 已提交
105
  SConnObj connObj = {.id = connId,
L
Liu Jicong 已提交
106
                      .connType = connType,
S
Shengliang Guan 已提交
107 108
                      .appStartTimeMs = startTime,
                      .pid = pid,
S
shm  
Shengliang Guan 已提交
109 110
                      .ip = ip,
                      .port = port,
S
Shengliang Guan 已提交
111
                      .killed = 0,
S
Shengliang Guan 已提交
112 113
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
S
Shengliang Guan 已提交
114 115 116 117
                      .queryId = 0,
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
118
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
119
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
120 121
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
122
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
123
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
124 125
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
126
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
127 128
    return NULL;
  } else {
S
shm  
Shengliang Guan 已提交
129
    mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
130 131
    return pConn;
  }
S
Shengliang Guan 已提交
132 133 134
}

static void mndFreeConn(SConnObj *pConn) {
wafwerar's avatar
wafwerar 已提交
135
  taosMemoryFreeClear(pConn->pQueries);
S
Shengliang Guan 已提交
136
  mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
137 138
}

S
Shengliang Guan 已提交
139
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
S
Shengliang Guan 已提交
140 141 142 143
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
144
    mDebug("conn:%d, already destroyed", connId);
S
Shengliang Guan 已提交
145 146 147
    return NULL;
  }

S
Shengliang Guan 已提交
148
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
149
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
150

S
Shengliang Guan 已提交
151
  mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
152 153 154 155 156
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
S
Shengliang Guan 已提交
157
  mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
158

S
Shengliang Guan 已提交
159
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
160 161 162
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

H
Haojun Liao 已提交
163
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
164 165
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
166 167 168 169 170
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
171 172
  }

H
Haojun Liao 已提交
173
  return pConn;
S
Shengliang Guan 已提交
174 175 176
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
177 178 179
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
180 181
}

S
Shengliang Guan 已提交
182 183
static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
  SMnode     *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
184 185 186 187 188
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};
S
shm  
Shengliang Guan 已提交
189
  char        ip[30] = {0};
S
Shengliang Guan 已提交
190 191 192 193 194

  if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
195

S
shm  
Shengliang Guan 已提交
196
  taosIp2String(pReq->clientIp, ip);
S
Shengliang Guan 已提交
197

198 199 200 201 202 203
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }

S
Shengliang Guan 已提交
204
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
205 206 207
    char db[TSDB_DB_FNAME_LEN];
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
208 209
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
210
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
211
      goto CONN_OVER;
S
Shengliang Guan 已提交
212 213 214
    }
  }

L
Liu Jicong 已提交
215 216
  pConn = mndCreateConn(pMnode, pReq->user, connReq.connType, pReq->clientIp, pReq->clientPort, connReq.pid,
                        connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
217
  if (pConn == NULL) {
S
Shengliang Guan 已提交
218
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
219
    goto CONN_OVER;
S
Shengliang Guan 已提交
220 221
  }

S
Shengliang Guan 已提交
222 223 224 225 226
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
227
  connectRsp.connType = connReq.connType;
S
Shengliang Guan 已提交
228

S
Shengliang Guan 已提交
229 230 231
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
232

S
Shengliang Guan 已提交
233 234 235 236 237
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
238

S
Shengliang Guan 已提交
239 240
  pReq->rspLen = contLen;
  pReq->pRsp = pRsp;
241

S
shm  
Shengliang Guan 已提交
242
  mDebug("user:%s, login from %s, conn:%d, app:%s", pReq->user, ip, pConn->id, connReq.app);
243 244 245 246 247 248 249 250 251 252

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
253 254
}

S
Shengliang Guan 已提交
255
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
S
Shengliang Guan 已提交
256
  pConn->numOfQueries = 0;
S
Shengliang Guan 已提交
257
  int32_t numOfQueries = htonl(pReq->numOfQueries);
S
Shengliang Guan 已提交
258 259 260

  if (numOfQueries > 0) {
    if (pConn->pQueries == NULL) {
wafwerar's avatar
wafwerar 已提交
261
      pConn->pQueries = taosMemoryCalloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
S
Shengliang Guan 已提交
262 263
    }

dengyihao's avatar
dengyihao 已提交
264
    pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries);
S
Shengliang Guan 已提交
265 266 267

    int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
    if (saveSize > 0 && pConn->pQueries != NULL) {
S
Shengliang Guan 已提交
268
      memcpy(pConn->pQueries, pReq->pData, saveSize);
S
Shengliang Guan 已提交
269 270 271 272 273 274
    }
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
275
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
276
#if 0
wafwerar's avatar
wafwerar 已提交
277
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
293
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
326
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
327 328 329 330 331 332 333 334 335
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
336 337
#endif
  return NULL;
L
Liu Jicong 已提交
338 339
}

S
Shengliang Guan 已提交
340 341
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
342

L
Liu Jicong 已提交
343
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
344 345 346 347 348
  if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
349 350
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
351

S
Shengliang Guan 已提交
352
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
353
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
354
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
L
Liu Jicong 已提交
355
    if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
D
dapan1121 已提交
356 357 358 359
      int32_t kvNum = taosHashGetSize(pHbReq->info);
      if (NULL == pHbReq->info || kvNum <= 0) {
        continue;
      }
L
Liu Jicong 已提交
360

D
dapan1121 已提交
361 362 363 364
      SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};

      void *pIter = taosHashIterate(pHbReq->info, NULL);
      while (pIter != NULL) {
S
Shengliang Guan 已提交
365 366
        SKv *kv = pIter;

D
dapan1121 已提交
367
        switch (kv->key) {
D
dapan1121 已提交
368
          case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
369
            void   *rspMsg = NULL;
D
dapan1121 已提交
370
            int32_t rspLen = 0;
S
Shengliang Guan 已提交
371
            mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
372
            if (rspMsg && rspLen > 0) {
H
Haojun Liao 已提交
373 374
              SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
              taosArrayPush(hbRsp.info, &kv1);
D
dapan1121 已提交
375 376
            }
            break;
D
dapan1121 已提交
377
          }
D
dapan 已提交
378
          case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
379
            void   *rspMsg = NULL;
D
dapan 已提交
380
            int32_t rspLen = 0;
S
Shengliang Guan 已提交
381
            mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
D
dapan 已提交
382
            if (rspMsg && rspLen > 0) {
H
Haojun Liao 已提交
383 384
              SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
              taosArrayPush(hbRsp.info, &kv1);
D
dapan 已提交
385
            }
D
dapan1121 已提交
386
            break;
D
dapan 已提交
387
          }
D
dapan1121 已提交
388 389
          default:
            mError("invalid kv key:%d", kv->key);
D
dapan1121 已提交
390
            hbRsp.status = TSDB_CODE_MND_APP_ERROR;
D
dapan1121 已提交
391 392
            break;
        }
S
Shengliang Guan 已提交
393

D
dapan1121 已提交
394 395
        pIter = taosHashIterate(pHbReq->info, pIter);
      }
D
dapan1121 已提交
396 397

      taosArrayPush(batchRsp.rsps, &hbRsp);
L
Liu Jicong 已提交
398
    } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
L
Liu Jicong 已提交
399 400 401
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
402
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
403
      }
L
Liu Jicong 已提交
404 405
    }
  }
S
Shengliang Guan 已提交
406
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
407

S
Shengliang Guan 已提交
408 409 410 411
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
412 413 414
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
S
Shengliang Guan 已提交
415
    int32_t       kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
D
dapan1121 已提交
416 417
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
wafwerar's avatar
wafwerar 已提交
418
      taosMemoryFreeClear(kv->value);
D
dapan1121 已提交
419 420 421 422
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
423
  taosArrayDestroy(batchRsp.rsps);
S
Shengliang Guan 已提交
424 425
  pReq->rspLen = tlen;
  pReq->pRsp = buf;
L
Liu Jicong 已提交
426 427
  return 0;

L
Liu Jicong 已提交
428
#if 0
S
Shengliang Guan 已提交
429
  SMnode       *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
430 431
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
432 433 434
  SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
  pHeartbeat->connId = htonl(pHeartbeat->connId);
  pHeartbeat->pid = htonl(pHeartbeat->pid);
S
Shengliang Guan 已提交
435

S
Shengliang Guan 已提交
436
  SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
S
Shengliang Guan 已提交
437
  if (pConn == NULL) {
S
Shengliang Guan 已提交
438
    pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
S
Shengliang Guan 已提交
439
    if (pConn == NULL) {
S
Shengliang Guan 已提交
440
      mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
441 442
      return -1;
    } else {
S
Shengliang Guan 已提交
443
      mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
S
Shengliang Guan 已提交
444
    }
S
Shengliang Guan 已提交
445
  } else if (pConn->killed) {
S
Shengliang Guan 已提交
446
    mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
S
Shengliang Guan 已提交
447
    terrno = TSDB_CODE_MND_INVALID_CONNECTION;
S
Shengliang Guan 已提交
448 449
    return -1;
  } else {
S
Shengliang Guan 已提交
450 451 452 453 454 455 456 457 458 459 460 461 462
    if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
      char oldIpStr[40];
      char newIpStr[40];
      taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
      taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
      mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
             pConn->user, oldIpStr);

      if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
      taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }
S
Shengliang Guan 已提交
463 464 465 466 467 468
  }

  SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
  if (pRsp == NULL) {
    mndReleaseConn(pMnode, pConn);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
469
    mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
470 471 472
    return -1;
  }

S
Shengliang Guan 已提交
473
  mndSaveQueryStreamList(pConn, pHeartbeat);
S
Shengliang Guan 已提交
474 475 476 477 478 479 480 481 482
  if (pConn->killed != 0) {
    pRsp->killConnection = 1;
  }

  if (pConn->queryId != 0) {
    pRsp->queryId = htonl(pConn->queryId);
    pConn->queryId = 0;
  }

S
Shengliang Guan 已提交
483
  pRsp->connId = htonl(pConn->id);
S
Shengliang Guan 已提交
484
  pRsp->totalDnodes = htonl(1);
S
Shengliang Guan 已提交
485
  pRsp->onlineDnodes = htonl(1);
S
Shengliang Guan 已提交
486 487 488
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);
  mndReleaseConn(pMnode, pConn);

S
Shengliang Guan 已提交
489
  pReq->contLen = sizeof(SConnectRsp);
S
Shengliang Guan 已提交
490
  pReq->pRsp = pRsp;
S
Shengliang Guan 已提交
491
  return 0;
L
Liu Jicong 已提交
492
#endif
S
Shengliang Guan 已提交
493 494
}

S
Shengliang Guan 已提交
495 496
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
  SMnode       *pMnode = pReq->pNode;
497 498
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
499
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
500
  if (pUser == NULL) return 0;
501
  if (!pUser->superUser) {
502 503 504 505 506 507
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
508
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
509
  if (tDeserializeSKillQueryReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
510 511 512 513 514
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
515

S
Shengliang Guan 已提交
516
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
517
  if (pConn == NULL) {
S
Shengliang Guan 已提交
518
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
519 520 521
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
522 523
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user);
    pConn->queryId = killReq.queryId;
524 525 526 527 528
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
529 530
static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
  SMnode       *pMnode = pReq->pNode;
531 532
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
533
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
534
  if (pUser == NULL) return 0;
535
  if (!pUser->superUser) {
536 537 538 539 540 541
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
542
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
543
  if (tDeserializeSKillConnReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
544 545 546
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
547

S
Shengliang Guan 已提交
548
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
549
  if (pConn == NULL) {
S
Shengliang Guan 已提交
550
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
551 552 553
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
554
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->user);
555 556 557 558 559 560
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
561 562
static int32_t mndGetConnsMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode       *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
563 564
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
565
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
566
  if (pUser == NULL) return 0;
567
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
568
    mndReleaseUser(pMnode, pUser);
569 570
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
571 572 573 574
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
575
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
576 577 578 579

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
580
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
581 582 583 584 585
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
586
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
587 588 589 590 591 592
  cols++;

  // app name
  pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "program");
S
Shengliang Guan 已提交
593
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
594 595 596 597 598 599
  cols++;

  // app pid
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
600
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
601 602 603 604 605
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
606
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
607 608 609 610 611
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "login_time");
S
Shengliang Guan 已提交
612
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
613 614 615 616 617
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "last_access");
S
Shengliang Guan 已提交
618
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
619 620
  cols++;

S
Shengliang Guan 已提交
621
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
622 623 624 625 626 627 628
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

H
Haojun Liao 已提交
629
  pShow->numOfRows = taosCacheGetNumOfObj(pMgmt->cache);
S
Shengliang Guan 已提交
630
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
631
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
632 633 634 635

  return 0;
}

S
Shengliang Guan 已提交
636 637
static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
638
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
639
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
640 641 642 643
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

H
Haojun Liao 已提交
644 645 646 647 648
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
649
  while (numOfRows < rows) {
H
Haojun Liao 已提交
650
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
651
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
652 653 654 655

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
656
    *(int32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
657 658 659
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
660
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
661 662 663 664
    cols++;

    // app name
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
665
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
S
Shengliang Guan 已提交
666 667 668 669
    cols++;

    // app pid
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
670
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
671 672 673
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
674
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
S
Shengliang Guan 已提交
675 676 677 678
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
679
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
680 681 682
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
683 684
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
685 686 687 688 689 690 691 692 693 694
    cols++;

    numOfRows++;
  }

  pShow->numOfReads += numOfRows;

  return numOfRows;
}

S
Shengliang Guan 已提交
695 696
static int32_t mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode       *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
697 698
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
699
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
700
  if (pUser == NULL) return 0;
701
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
702
    mndReleaseUser(pMnode, pUser);
703 704
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
705 706 707 708
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
709
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
710

S
Shengliang Guan 已提交
711 712 713
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "queryId");
S
Shengliang Guan 已提交
714
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
715 716 717 718 719
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
720
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
721 722 723 724 725
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
726
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
727 728 729 730 731
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
732
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
733 734
  cols++;

S
Shengliang Guan 已提交
735
  pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
S
Shengliang Guan 已提交
736 737
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "qid");
S
Shengliang Guan 已提交
738
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
739 740 741 742 743
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "created_time");
S
Shengliang Guan 已提交
744
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
745 746 747 748 749
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
  strcpy(pSchema[cols].name, "time");
S
Shengliang Guan 已提交
750
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
751 752 753 754 755
  cols++;

  pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql_obj_id");
S
Shengliang Guan 已提交
756
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
757 758 759 760 761
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
762
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
763 764 765 766 767
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ep");
S
Shengliang Guan 已提交
768
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
769 770 771 772 773
  cols++;

  pShow->bytes[cols] = 1;
  pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
  strcpy(pSchema[cols].name, "stable_query");
S
Shengliang Guan 已提交
774
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
775 776 777 778 779
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "sub_queries");
S
Shengliang Guan 已提交
780
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
781 782 783 784 785
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sub_query_info");
S
Shengliang Guan 已提交
786
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
787 788 789 790 791
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
S
Shengliang Guan 已提交
792
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
793 794
  cols++;

S
Shengliang Guan 已提交
795
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
796 797 798 799 800 801 802 803 804
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = 1000000;
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
805
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
806 807 808 809

  return 0;
}

S
Shengliang Guan 已提交
810 811
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
812
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
813
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
814
  int32_t   cols = 0;
815 816
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
817 818
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

H
Haojun Liao 已提交
819 820 821 822 823
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
824
  while (numOfRows < rows) {
H
Haojun Liao 已提交
825
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
826
    if (pConn == NULL) {
H
Haojun Liao 已提交
827
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
828 829 830
      break;
    }

S
Shengliang Guan 已提交
831
    if (numOfRows + pConn->numOfQueries >= rows) {
H
Haojun Liao 已提交
832 833
      taosCacheDestroyIter(pShow->pIter);
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
834 835 836
      break;
    }

S
Shengliang Guan 已提交
837 838
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
839 840 841
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
842 843 844 845 846
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
847 848 849
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
850
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
851 852 853
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
854
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
S
Shengliang Guan 已提交
855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
883
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
S
Shengliang Guan 已提交
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
      cols++;

      numOfRows++;
    }
  }

S
Shengliang Guan 已提交
908
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
909 910 911 912 913
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
914 915 916
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
917
}
S
Shengliang Guan 已提交
918

S
Shengliang Guan 已提交
919 920 921
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  return taosCacheGetNumOfObj(pMgmt->cache);
L
Liu Jicong 已提交
922
}