mndProfile.c 24.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20
#include "mndProfile.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
21
#include "mndStb.h"
S
Shengliang Guan 已提交
22
#include "mndUser.h"
S
Shengliang Guan 已提交
23
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
24
#include "version.h"
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
typedef struct {
L
Liu Jicong 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
dengyihao's avatar
dengyihao 已提交
40
  SArray * pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
41 42
} SConnObj;

L
Liu Jicong 已提交
43 44
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
45
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
46
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
47
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
dengyihao's avatar
dengyihao 已提交
48
static void *    mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
49
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
50 51 52 53 54 55
static int32_t   mndProcessHeartBeatReq(SNodeMsg *pReq);
static int32_t   mndProcessConnectReq(SNodeMsg *pReq);
static int32_t   mndProcessKillQueryReq(SNodeMsg *pReq);
static int32_t   mndProcessKillConnReq(SNodeMsg *pReq);
static int32_t   mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t   mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
56 57 58 59 60
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
61 62
  // in ms
  int32_t connCheckTime = tsShellActivityTimer * 2 * 1000;
S
Shengliang Guan 已提交
63 64 65 66 67 68 69
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
70 71 72 73
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
74

L
Liu Jicong 已提交
75
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
76
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
L
Liu Jicong 已提交
77
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
78
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
79

S
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89 90
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

L
Liu Jicong 已提交
91 92
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
93 94
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
95
  char    connStr[255] = {0};
D
dapan1121 已提交
96 97
  int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
  int32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
98
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
99

S
Shengliang Guan 已提交
100
  SConnObj connObj = {.id = connId,
L
Liu Jicong 已提交
101
                      .connType = connType,
S
Shengliang Guan 已提交
102 103
                      .appStartTimeMs = startTime,
                      .pid = pid,
S
shm  
Shengliang Guan 已提交
104 105
                      .ip = ip,
                      .port = port,
S
Shengliang Guan 已提交
106
                      .killed = 0,
S
Shengliang Guan 已提交
107 108
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
D
dapan1121 已提交
109
                      .killId = 0,
S
Shengliang Guan 已提交
110 111 112
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
113
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
114
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
115 116
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
117
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
118
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
119 120
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
121
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
122 123
    return NULL;
  } else {
D
dapan1121 已提交
124
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
125 126
    return pConn;
  }
S
Shengliang Guan 已提交
127 128 129
}

static void mndFreeConn(SConnObj *pConn) {
wafwerar's avatar
wafwerar 已提交
130
  taosMemoryFreeClear(pConn->pQueries);
D
dapan1121 已提交
131
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
132 133
}

D
dapan1121 已提交
134
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
135 136
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
137
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
138
  if (pConn == NULL) {
D
dapan1121 已提交
139
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
140 141 142
    return NULL;
  }

S
Shengliang Guan 已提交
143
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
144
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
145

D
dapan1121 已提交
146
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
147 148 149 150 151
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
152
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
153

S
Shengliang Guan 已提交
154
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
155 156 157
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

H
Haojun Liao 已提交
158
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
159 160
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
161 162 163 164 165
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
166 167
  }

H
Haojun Liao 已提交
168
  return pConn;
S
Shengliang Guan 已提交
169 170 171
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
172 173 174
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
175 176
}

S
Shengliang Guan 已提交
177 178
static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
  SMnode     *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
179 180 181 182 183
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};
S
shm  
Shengliang Guan 已提交
184
  char        ip[30] = {0};
S
Shengliang Guan 已提交
185 186 187 188 189

  if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
190

S
shm  
Shengliang Guan 已提交
191
  taosIp2String(pReq->clientIp, ip);
S
Shengliang Guan 已提交
192

193 194 195 196 197
  pUser = mndAcquireUser(pMnode, pReq->user);
  if (pUser == NULL) {
    mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
    goto CONN_OVER;
  }
dengyihao's avatar
dengyihao 已提交
198
  if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
199
    mError("user:%s, failed to auth while acquire user, input:%s saved:%s", pReq->user, connReq.passwd, pUser->pass);
dengyihao's avatar
dengyihao 已提交
200 201 202
    code = TSDB_CODE_RPC_AUTH_FAILURE;
    goto CONN_OVER;
  }
203

S
Shengliang Guan 已提交
204
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
205 206 207
    char db[TSDB_DB_FNAME_LEN];
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
208 209
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
210
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
211
      goto CONN_OVER;
S
Shengliang Guan 已提交
212 213 214
    }
  }

L
Liu Jicong 已提交
215 216
  pConn = mndCreateConn(pMnode, pReq->user, connReq.connType, pReq->clientIp, pReq->clientPort, connReq.pid,
                        connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
217
  if (pConn == NULL) {
S
Shengliang Guan 已提交
218
    mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
219
    goto CONN_OVER;
S
Shengliang Guan 已提交
220 221
  }

D
dapan1121 已提交
222 223
  mndAcquireConn(pMnode, pConn->id);

S
Shengliang Guan 已提交
224 225 226 227 228
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
229
  connectRsp.connType = connReq.connType;
S
Shengliang Guan 已提交
230

S
Shengliang Guan 已提交
231 232 233
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
234

S
Shengliang Guan 已提交
235 236 237 238 239
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
240

S
Shengliang Guan 已提交
241 242
  pReq->rspLen = contLen;
  pReq->pRsp = pRsp;
243

D
dapan1121 已提交
244
  mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->user, ip, pConn->port, pConn->id, connReq.app);
245 246 247 248 249 250 251 252 253 254

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
255 256
}

D
dapan1121 已提交
257 258
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
259

D
dapan1121 已提交
260 261
  pConn->pQueries = pBasic->queryDesc;
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
262 263

  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
S
Shengliang Guan 已提交
264 265 266 267

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
268
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
269
#if 0
wafwerar's avatar
wafwerar 已提交
270
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
286
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
319
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
320 321 322 323 324 325 326 327 328
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
329 330
#endif
  return NULL;
L
Liu Jicong 已提交
331 332
}

L
Liu Jicong 已提交
333 334
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
335
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
336
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
D
dapan1121 已提交
337 338 339 340 341 342 343 344

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

    SRpcConnInfo connInfo = {0};
    rpcGetConnInfo(pMsg->handle, &connInfo);

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
345 346 347
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
                            pBasic->pid, pBasic->app, 0);
D
dapan1121 已提交
348 349 350 351 352 353 354 355
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
        mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
      }
    } else if (pConn->killed) {
      mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
L
Liu Jicong 已提交
356
      mndReleaseConn(pMnode, pConn);
D
dapan1121 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }

    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
L
Liu Jicong 已提交
380 381
    rspBasic->totalDnodes = 1;   // TODO
    rspBasic->onlineDnodes = 1;  // TODO
D
dapan1121 已提交
382 383 384 385 386 387 388 389
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
390
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
406 407 408 409 410 411 412 413 414 415
      case HEARTBEAT_KEY_USER_AUTHINFO: {
        void *  rspMsg = NULL;
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
416
      case HEARTBEAT_KEY_DBINFO: {
dengyihao's avatar
dengyihao 已提交
417
        void *  rspMsg = NULL;
D
dapan1121 已提交
418 419 420 421 422 423 424 425 426
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
dengyihao's avatar
dengyihao 已提交
427
        void *  rspMsg = NULL;
D
dapan1121 已提交
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
        int32_t rspLen = 0;
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
450 451
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
452

L
Liu Jicong 已提交
453
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
454 455 456 457 458
  if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
459 460
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
461

S
Shengliang Guan 已提交
462
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
463
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
464
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
465
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
D
dapan1121 已提交
466
      mndProcessQueryHeartBeat(pMnode, &pReq->rpcMsg, pHbReq, &batchRsp);
D
dapan1121 已提交
467
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
468 469 470
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
471
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
472
      }
L
Liu Jicong 已提交
473 474
    }
  }
S
Shengliang Guan 已提交
475
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
476

S
Shengliang Guan 已提交
477 478 479 480
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
481 482 483
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
S
Shengliang Guan 已提交
484
    int32_t       kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
D
dapan1121 已提交
485 486
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
wafwerar's avatar
wafwerar 已提交
487
      taosMemoryFreeClear(kv->value);
D
dapan1121 已提交
488 489 490 491
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
492
  taosArrayDestroy(batchRsp.rsps);
S
Shengliang Guan 已提交
493 494
  pReq->rspLen = tlen;
  pReq->pRsp = buf;
S
Shengliang Guan 已提交
495 496 497 498

  return 0;
}

S
Shengliang Guan 已提交
499 500
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
  SMnode       *pMnode = pReq->pNode;
501 502
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
503
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
504
  if (pUser == NULL) return 0;
505
  if (!pUser->superUser) {
506 507 508 509 510 511
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
512
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
513
  if (tDeserializeSKillQueryReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
514 515 516 517 518
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
519

S
Shengliang Guan 已提交
520
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
521
  if (pConn == NULL) {
S
Shengliang Guan 已提交
522
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
523 524 525
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
526
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user);
D
dapan1121 已提交
527
    pConn->killId = killReq.queryId;
528 529 530 531 532
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
533 534
static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
  SMnode       *pMnode = pReq->pNode;
535 536
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
537
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
538
  if (pUser == NULL) return 0;
539
  if (!pUser->superUser) {
540 541 542 543 544 545
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
546
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
547
  if (tDeserializeSKillConnReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
548 549 550
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
551

S
Shengliang Guan 已提交
552
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
553
  if (pConn == NULL) {
S
Shengliang Guan 已提交
554
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
555 556 557
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
558
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->user);
559 560 561 562 563 564
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
565 566
static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
567
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
568
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
569 570 571 572
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

H
Haojun Liao 已提交
573 574 575 576 577
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
578
  while (numOfRows < rows) {
H
Haojun Liao 已提交
579
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
580
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
581 582

    cols = 0;
583 584
#if 0
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
D
dapan1121 已提交
585
    *(uint32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
586 587
    cols++;

588 589
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
590 591 592
    cols++;

    // app name
593 594
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
595 596 597
    cols++;

    // app pid
598
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
599
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
600 601
    cols++;

602
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
603
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
604
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
605 606
    cols++;

607
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
608
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
609 610
    cols++;

611
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
612 613
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
614
    cols++;
615
#endif
S
Shengliang Guan 已提交
616 617 618 619

    numOfRows++;
  }

620
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
621 622 623 624

  return numOfRows;
}

S
Shengliang Guan 已提交
625
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
L
Liu Jicong 已提交
626 627
  SMnode   *pMnode = pReq->pNode;
  int32_t   numOfRows = 0;
D
dapan1121 已提交
628
#if 0
S
Shengliang Guan 已提交
629
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
630
  int32_t   cols = 0;
631 632
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
633 634
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

H
Haojun Liao 已提交
635 636 637 638 639
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
640
  while (numOfRows < rows) {
H
Haojun Liao 已提交
641
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
642
    if (pConn == NULL) {
H
Haojun Liao 已提交
643
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
644 645 646
      break;
    }

S
Shengliang Guan 已提交
647
    if (numOfRows + pConn->numOfQueries >= rows) {
H
Haojun Liao 已提交
648 649
      taosCacheDestroyIter(pShow->pIter);
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
650 651 652
      break;
    }

S
Shengliang Guan 已提交
653 654
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
655 656
      cols = 0;

657
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
658 659 660
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

661
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
662
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
663 664
      cols++;

665 666
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
667 668
      cols++;

669
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
670
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
671
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
672 673 674 675
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
676
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
677

678
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
679 680
      cols++;

681
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
682 683 684
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

685
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
686 687 688 689
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
690 691
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
692 693
      cols++;

694
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
695 696 697 698
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
699
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
700 701
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
702 703
      cols++;

704
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
705 706 707
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

708
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
709 710 711
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

712 713
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
714 715
      cols++;

716 717
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
718 719 720 721 722 723
      cols++;

      numOfRows++;
    }
  }

724
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
725
#endif
S
Shengliang Guan 已提交
726 727 728 729
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
730 731 732
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
733
}
S
Shengliang Guan 已提交
734

S
Shengliang Guan 已提交
735 736 737
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  return taosCacheGetNumOfObj(pMgmt->cache);
D
dapan1121 已提交
738
}