mndProfile.c 29.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20
#include "mndProfile.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
21
#include "mndStb.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
24
#include "version.h"
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26 27 28
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6
S
Shengliang Guan 已提交
29
#define QUERY_SAVE_SIZE 20
S
Shengliang Guan 已提交
30 31

typedef struct {
S
Shengliang Guan 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44
  int32_t     id;
  char        user[TSDB_USER_LEN];
  char        app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t     appStartTimeMs;          // app start time
  int32_t     pid;                     // pid of app that invokes taosc
  uint32_t    ip;
  uint16_t    port;
  int8_t      killed;
  int64_t     loginTimeMs;
  int64_t     lastAccessTimeMs;
  int32_t     queryId;
  int32_t     numOfQueries;
  SQueryDesc *pQueries;
S
Shengliang Guan 已提交
45 46
} SConnObj;

S
shm  
Shengliang Guan 已提交
47 48
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
                               const char *app, int64_t startTime);
S
Shengliang Guan 已提交
49
static void      mndFreeConn(SConnObj *pConn);
S
Shengliang Guan 已提交
50
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
S
Shengliang Guan 已提交
51
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
H
Haojun Liao 已提交
52
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
53
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
54 55 56 57 58 59 60 61
static int32_t   mndProcessHeartBeatReq(SNodeMsg *pReq);
static int32_t   mndProcessConnectReq(SNodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SNodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SNodeMsg *pReq);
static int32_t   mndGetConnsMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t   mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t   mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t   mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
62 63 64 65 66
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
67
  int32_t connCheckTime = tsShellActivityTimer * 2;
S
Shengliang Guan 已提交
68 69 70 71 72 73 74
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
75 76 77 78
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
79 80 81

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
S
Shengliang Guan 已提交
82 83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
84

S
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
shm  
Shengliang Guan 已提交
96 97
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
                               const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
98 99 100 101
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
  if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
S
Shengliang Guan 已提交
102
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
103

S
Shengliang Guan 已提交
104 105 106
  SConnObj connObj = {.id = connId,
                      .appStartTimeMs = startTime,
                      .pid = pid,
S
shm  
Shengliang Guan 已提交
107 108
                      .ip = ip,
                      .port = port,
S
Shengliang Guan 已提交
109
                      .killed = 0,
S
Shengliang Guan 已提交
110 111
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
S
Shengliang Guan 已提交
112 113 114 115
                      .queryId = 0,
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
116
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
117
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
118 119
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
120
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
121
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
122 123
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
124
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
125 126
    return NULL;
  } else {
S
shm  
Shengliang Guan 已提交
127
    mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
128 129
    return pConn;
  }
S
Shengliang Guan 已提交
130 131 132
}

static void mndFreeConn(SConnObj *pConn) {
wafwerar's avatar
wafwerar 已提交
133
  taosMemoryFreeClear(pConn->pQueries);
S
Shengliang Guan 已提交
134
  mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
135 136
}

S
Shengliang Guan 已提交
137
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
S
Shengliang Guan 已提交
138 139 140 141
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
  if (pConn == NULL) {
S
Shengliang Guan 已提交
142
    mDebug("conn:%d, already destroyed", connId);
S
Shengliang Guan 已提交
143 144 145
    return NULL;
  }

S
Shengliang Guan 已提交
146
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
147
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
148

S
Shengliang Guan 已提交
149
  mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
150 151 152 153 154
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
S
Shengliang Guan 已提交
155
  mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
158 159 160
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

H
Haojun Liao 已提交
161 162 163 164 165 166 167 168
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
  SConnObj* pConn = NULL;
  bool hasNext = taosCacheIterNext(pIter);
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
169 170
  }

H
Haojun Liao 已提交
171
  return pConn;
S
Shengliang Guan 已提交
172 173 174
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
175 176 177
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
178 179
}

S
Shengliang Guan 已提交
180 181
static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
  SMnode     *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
182 183 184 185 186
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};
S
shm  
Shengliang Guan 已提交
187
  char        ip[30] = {0};
S
Shengliang Guan 已提交
188 189 190 191 192

  if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
193

S
shm  
Shengliang Guan 已提交
194
  taosIp2String(pReq->clientIp, ip);
S
Shengliang Guan 已提交
195

196 197 198 199 200 201
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }

S
Shengliang Guan 已提交
202
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
203 204 205
    char db[TSDB_DB_FNAME_LEN];
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
206 207
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
208
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
209
      goto CONN_OVER;
S
Shengliang Guan 已提交
210 211 212
    }
  }

S
shm  
Shengliang Guan 已提交
213 214
  pConn =
      mndCreateConn(pMnode, pReq->user, pReq->clientIp, pReq->clientPort, connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
215
  if (pConn == NULL) {
S
Shengliang Guan 已提交
216
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
217
    goto CONN_OVER;
S
Shengliang Guan 已提交
218 219
  }

S
Shengliang Guan 已提交
220 221 222 223 224
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
S
Shengliang Guan 已提交
225

S
Shengliang Guan 已提交
226 227 228
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
229

S
Shengliang Guan 已提交
230 231 232 233 234
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
235

S
Shengliang Guan 已提交
236 237
  pReq->rspLen = contLen;
  pReq->pRsp = pRsp;
238

S
shm  
Shengliang Guan 已提交
239
  mDebug("user:%s, login from %s, conn:%d, app:%s", pReq->user, ip, pConn->id, connReq.app);
240 241 242 243 244 245 246 247 248 249

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
250 251
}

S
Shengliang Guan 已提交
252
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
S
Shengliang Guan 已提交
253
  pConn->numOfQueries = 0;
S
Shengliang Guan 已提交
254
  int32_t numOfQueries = htonl(pReq->numOfQueries);
S
Shengliang Guan 已提交
255 256 257

  if (numOfQueries > 0) {
    if (pConn->pQueries == NULL) {
wafwerar's avatar
wafwerar 已提交
258
      pConn->pQueries = taosMemoryCalloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
S
Shengliang Guan 已提交
259 260
    }

dengyihao's avatar
dengyihao 已提交
261
    pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries);
S
Shengliang Guan 已提交
262 263 264

    int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
    if (saveSize > 0 && pConn->pQueries != NULL) {
S
Shengliang Guan 已提交
265
      memcpy(pConn->pQueries, pReq->pData, saveSize);
S
Shengliang Guan 已提交
266 267 268 269 270 271
    }
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
272
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
273
#if 0
wafwerar's avatar
wafwerar 已提交
274
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
290
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
323
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
324 325 326 327 328 329 330 331 332
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
333 334
#endif
  return NULL;
L
Liu Jicong 已提交
335 336
}

S
Shengliang Guan 已提交
337 338
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
339

L
Liu Jicong 已提交
340
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
341 342 343 344 345
  if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

S
Shengliang Guan 已提交
346
  
L
Liu Jicong 已提交
347 348
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
349

S
Shengliang Guan 已提交
350
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
351
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
352
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
L
Liu Jicong 已提交
353
    if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
D
dapan1121 已提交
354 355 356 357
      int32_t kvNum = taosHashGetSize(pHbReq->info);
      if (NULL == pHbReq->info || kvNum <= 0) {
        continue;
      }
L
Liu Jicong 已提交
358

D
dapan1121 已提交
359 360 361 362
      SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))};

      void *pIter = taosHashIterate(pHbReq->info, NULL);
      while (pIter != NULL) {
S
Shengliang Guan 已提交
363 364
        SKv *kv = pIter;

D
dapan1121 已提交
365
        switch (kv->key) {
D
dapan1121 已提交
366
          case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
367
            void   *rspMsg = NULL;
D
dapan1121 已提交
368
            int32_t rspLen = 0;
S
Shengliang Guan 已提交
369
            mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
370
            if (rspMsg && rspLen > 0) {
H
Haojun Liao 已提交
371 372
              SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
              taosArrayPush(hbRsp.info, &kv1);
D
dapan1121 已提交
373 374
            }
            break;
D
dapan1121 已提交
375
          }
D
dapan 已提交
376
          case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
377
            void   *rspMsg = NULL;
D
dapan 已提交
378
            int32_t rspLen = 0;
S
Shengliang Guan 已提交
379
            mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
D
dapan 已提交
380
            if (rspMsg && rspLen > 0) {
H
Haojun Liao 已提交
381 382
              SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
              taosArrayPush(hbRsp.info, &kv1);
D
dapan 已提交
383
            }
D
dapan1121 已提交
384
            break;
D
dapan 已提交
385
          }
D
dapan1121 已提交
386 387
          default:
            mError("invalid kv key:%d", kv->key);
D
dapan1121 已提交
388
            hbRsp.status = TSDB_CODE_MND_APP_ERROR;
D
dapan1121 已提交
389 390
            break;
        }
S
Shengliang Guan 已提交
391

D
dapan1121 已提交
392 393
        pIter = taosHashIterate(pHbReq->info, pIter);
      }
D
dapan1121 已提交
394 395

      taosArrayPush(batchRsp.rsps, &hbRsp);
L
Liu Jicong 已提交
396
    } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
L
Liu Jicong 已提交
397 398 399
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
400
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
401
      }
L
Liu Jicong 已提交
402 403
    }
  }
S
Shengliang Guan 已提交
404
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
405

S
Shengliang Guan 已提交
406 407 408 409
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
410 411 412
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
S
Shengliang Guan 已提交
413
    int32_t       kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
D
dapan1121 已提交
414 415
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
wafwerar's avatar
wafwerar 已提交
416
      taosMemoryFreeClear(kv->value);
D
dapan1121 已提交
417 418 419 420
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
421
  taosArrayDestroy(batchRsp.rsps);
S
Shengliang Guan 已提交
422 423
  pReq->rspLen = tlen;
  pReq->pRsp = buf;
L
Liu Jicong 已提交
424 425
  return 0;

L
Liu Jicong 已提交
426
#if 0
S
Shengliang Guan 已提交
427
  SMnode       *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
428 429
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
430 431 432
  SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
  pHeartbeat->connId = htonl(pHeartbeat->connId);
  pHeartbeat->pid = htonl(pHeartbeat->pid);
S
Shengliang Guan 已提交
433

S
Shengliang Guan 已提交
434
  SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
S
Shengliang Guan 已提交
435
  if (pConn == NULL) {
S
Shengliang Guan 已提交
436
    pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
S
Shengliang Guan 已提交
437
    if (pConn == NULL) {
S
Shengliang Guan 已提交
438
      mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
439 440
      return -1;
    } else {
S
Shengliang Guan 已提交
441
      mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
S
Shengliang Guan 已提交
442
    }
S
Shengliang Guan 已提交
443
  } else if (pConn->killed) {
S
Shengliang Guan 已提交
444
    mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
S
Shengliang Guan 已提交
445
    terrno = TSDB_CODE_MND_INVALID_CONNECTION;
S
Shengliang Guan 已提交
446 447
    return -1;
  } else {
S
Shengliang Guan 已提交
448 449 450 451 452 453 454 455 456 457 458 459 460
    if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
      char oldIpStr[40];
      char newIpStr[40];
      taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
      taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
      mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
             pConn->user, oldIpStr);

      if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
      taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }
S
Shengliang Guan 已提交
461 462 463 464 465 466
  }

  SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
  if (pRsp == NULL) {
    mndReleaseConn(pMnode, pConn);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
467
    mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
S
Shengliang Guan 已提交
468 469 470
    return -1;
  }

S
Shengliang Guan 已提交
471
  mndSaveQueryStreamList(pConn, pHeartbeat);
S
Shengliang Guan 已提交
472 473 474 475 476 477 478 479 480
  if (pConn->killed != 0) {
    pRsp->killConnection = 1;
  }

  if (pConn->queryId != 0) {
    pRsp->queryId = htonl(pConn->queryId);
    pConn->queryId = 0;
  }

S
Shengliang Guan 已提交
481
  pRsp->connId = htonl(pConn->id);
S
Shengliang Guan 已提交
482
  pRsp->totalDnodes = htonl(1);
S
Shengliang Guan 已提交
483
  pRsp->onlineDnodes = htonl(1);
S
Shengliang Guan 已提交
484 485 486
  mndGetMnodeEpSet(pMnode, &pRsp->epSet);
  mndReleaseConn(pMnode, pConn);

S
Shengliang Guan 已提交
487
  pReq->contLen = sizeof(SConnectRsp);
S
Shengliang Guan 已提交
488
  pReq->pRsp = pRsp;
S
Shengliang Guan 已提交
489
  return 0;
L
Liu Jicong 已提交
490
#endif
S
Shengliang Guan 已提交
491 492
}

S
Shengliang Guan 已提交
493 494
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
  SMnode       *pMnode = pReq->pNode;
495 496
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
497
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
498
  if (pUser == NULL) return 0;
499
  if (!pUser->superUser) {
500 501 502 503 504 505
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
506
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
507
  if (tDeserializeSKillQueryReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
508 509 510 511 512
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
513

S
Shengliang Guan 已提交
514
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
515
  if (pConn == NULL) {
S
Shengliang Guan 已提交
516
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
517 518 519
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
520 521
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user);
    pConn->queryId = killReq.queryId;
522 523 524 525 526
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
527 528
static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
  SMnode       *pMnode = pReq->pNode;
529 530
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
531
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
532
  if (pUser == NULL) return 0;
533
  if (!pUser->superUser) {
534 535 536 537 538 539
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
540
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
541
  if (tDeserializeSKillConnReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
542 543 544
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
545

S
Shengliang Guan 已提交
546
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
547
  if (pConn == NULL) {
S
Shengliang Guan 已提交
548
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
549 550 551
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
552
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->user);
553 554 555 556 557 558
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
559 560
static int32_t mndGetConnsMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode       *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
561 562
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
563
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
564
  if (pUser == NULL) return 0;
565
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
566
    mndReleaseUser(pMnode, pUser);
567 568
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
569 570 571 572
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
573
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
574 575 576 577

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
578
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
579 580 581 582 583
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
584
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
585 586 587 588 589 590
  cols++;

  // app name
  pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "program");
S
Shengliang Guan 已提交
591
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
592 593 594 595 596 597
  cols++;

  // app pid
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
598
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
599 600 601 602 603
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
604
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
605 606 607 608 609
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "login_time");
S
Shengliang Guan 已提交
610
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
611 612 613 614 615
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "last_access");
S
Shengliang Guan 已提交
616
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
617 618
  cols++;

S
Shengliang Guan 已提交
619
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
620 621 622 623 624 625 626
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

H
Haojun Liao 已提交
627
  pShow->numOfRows = taosCacheGetNumOfObj(pMgmt->cache);
S
Shengliang Guan 已提交
628
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
629
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
630 631 632 633

  return 0;
}

S
Shengliang Guan 已提交
634 635
static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
636
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
637
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
638 639 640 641
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

H
Haojun Liao 已提交
642 643 644 645 646
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
647
  while (numOfRows < rows) {
H
Haojun Liao 已提交
648
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
649
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
650 651 652 653

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
654
    *(int32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
655 656 657
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
658
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
659 660 661 662
    cols++;

    // app name
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
663
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
S
Shengliang Guan 已提交
664 665 666 667
    cols++;

    // app pid
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
668
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
669 670 671
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
672
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
S
Shengliang Guan 已提交
673 674 675 676
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
677
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
678 679 680
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
681 682
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
683 684 685 686 687 688 689 690 691 692
    cols++;

    numOfRows++;
  }

  pShow->numOfReads += numOfRows;

  return numOfRows;
}

S
Shengliang Guan 已提交
693 694
static int32_t mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode       *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
695 696
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
697
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
S
Shengliang Guan 已提交
698
  if (pUser == NULL) return 0;
699
  if (!pUser->superUser) {
S
Shengliang Guan 已提交
700
    mndReleaseUser(pMnode, pUser);
701 702
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
S
Shengliang Guan 已提交
703 704 705 706
  }
  mndReleaseUser(pMnode, pUser);

  int32_t  cols = 0;
S
Shengliang Guan 已提交
707
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
708

S
Shengliang Guan 已提交
709 710 711
  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "queryId");
S
Shengliang Guan 已提交
712
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
713 714 715 716 717
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "connId");
S
Shengliang Guan 已提交
718
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
719 720 721 722 723
  cols++;

  pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "user");
S
Shengliang Guan 已提交
724
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
725 726 727 728 729
  cols++;

  pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ip:port");
S
Shengliang Guan 已提交
730
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
731 732
  cols++;

S
Shengliang Guan 已提交
733
  pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
S
Shengliang Guan 已提交
734 735
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "qid");
S
Shengliang Guan 已提交
736
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
737 738 739 740 741
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "created_time");
S
Shengliang Guan 已提交
742
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
743 744 745 746 747
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
  strcpy(pSchema[cols].name, "time");
S
Shengliang Guan 已提交
748
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
749 750 751 752 753
  cols++;

  pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql_obj_id");
S
Shengliang Guan 已提交
754
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
755 756 757 758 759
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "pid");
S
Shengliang Guan 已提交
760
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
761 762 763 764 765
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "ep");
S
Shengliang Guan 已提交
766
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
767 768 769 770 771
  cols++;

  pShow->bytes[cols] = 1;
  pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
  strcpy(pSchema[cols].name, "stable_query");
S
Shengliang Guan 已提交
772
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
773 774 775 776 777
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "sub_queries");
S
Shengliang Guan 已提交
778
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
779 780 781 782 783
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sub_query_info");
S
Shengliang Guan 已提交
784
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
785 786 787 788 789
  cols++;

  pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "sql");
S
Shengliang Guan 已提交
790
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
791 792
  cols++;

S
Shengliang Guan 已提交
793
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
794 795 796 797 798 799 800 801 802
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = 1000000;
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
803
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
804 805 806 807

  return 0;
}

S
Shengliang Guan 已提交
808 809
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
810
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
811
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
812
  int32_t   cols = 0;
813 814
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
815 816
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

H
Haojun Liao 已提交
817 818 819 820 821
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
822
  while (numOfRows < rows) {
H
Haojun Liao 已提交
823
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
824
    if (pConn == NULL) {
H
Haojun Liao 已提交
825
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
826 827 828
      break;
    }

S
Shengliang Guan 已提交
829
    if (numOfRows + pConn->numOfQueries >= rows) {
H
Haojun Liao 已提交
830 831
      taosCacheDestroyIter(pShow->pIter);
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
832 833 834
      break;
    }

S
Shengliang Guan 已提交
835 836
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
837 838 839
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
840 841 842 843 844
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
845 846 847
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
848
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
S
Shengliang Guan 已提交
849 850 851
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
852
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
S
Shengliang Guan 已提交
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
881
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
S
Shengliang Guan 已提交
882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
      cols++;

      numOfRows++;
    }
  }

S
Shengliang Guan 已提交
906
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
907 908 909 910 911
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
912 913 914
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
915
}
S
Shengliang Guan 已提交
916

S
Shengliang Guan 已提交
917 918 919 920
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  return taosCacheGetNumOfObj(pMgmt->cache);
}