mndProfile.c 24.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndProfile.h"
#include "mndDb.h"
S
Shengliang Guan 已提交
19
#include "mndDnode.h"
S
Shengliang Guan 已提交
20
#include "mndMnode.h"
D
dapan1121 已提交
21
#include "mndQnode.h"
S
Shengliang Guan 已提交
22
#include "mndShow.h"
S
Shengliang Guan 已提交
23
#include "mndStb.h"
S
Shengliang Guan 已提交
24
#include "mndUser.h"
S
Shengliang Guan 已提交
25
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
26
#include "version.h"
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
typedef struct {
L
Liu Jicong 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
42
  SRWLatch queryLock;
S
Shengliang Guan 已提交
43
  SArray  *pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
44 45
} SConnObj;

L
Liu Jicong 已提交
46 47
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
48
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
49
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
50
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
S
Shengliang Guan 已提交
51
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
52
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
53 54 55 56
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
57 58
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
59 60 61 62 63
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
64 65
  // in ms
  int32_t connCheckTime = tsShellActivityTimer * 2 * 1000;
S
Shengliang Guan 已提交
66 67 68 69 70 71 72
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
73 74 75 76
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
77

78
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
79
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
D
dapan1121 已提交
80
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
81
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
82

S
Shengliang Guan 已提交
83 84 85 86 87 88 89 90 91 92 93
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

L
Liu Jicong 已提交
94 95
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
96 97
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
98
  char    connStr[255] = {0};
D
dapan1121 已提交
99 100
  int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
  int32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
101
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
102

S
Shengliang Guan 已提交
103
  SConnObj connObj = {.id = connId,
L
Liu Jicong 已提交
104
                      .connType = connType,
S
Shengliang Guan 已提交
105 106
                      .appStartTimeMs = startTime,
                      .pid = pid,
S
shm  
Shengliang Guan 已提交
107 108
                      .ip = ip,
                      .port = port,
S
Shengliang Guan 已提交
109
                      .killed = 0,
S
Shengliang Guan 已提交
110 111
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
D
dapan1121 已提交
112
                      .killId = 0,
S
Shengliang Guan 已提交
113 114 115
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
116
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
117
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
118 119
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
120
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
121
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
122 123
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
124
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
125 126
    return NULL;
  } else {
D
dapan1121 已提交
127
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
128 129
    return pConn;
  }
S
Shengliang Guan 已提交
130 131 132
}

static void mndFreeConn(SConnObj *pConn) {
133
  taosWLockLatch(&pConn->queryLock);
D
dapan1121 已提交
134
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
135
  taosWUnLockLatch(&pConn->queryLock);
D
dapan1121 已提交
136
  
D
dapan1121 已提交
137
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
138 139
}

D
dapan1121 已提交
140
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
141 142
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
143
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
144
  if (pConn == NULL) {
D
dapan1121 已提交
145
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
146 147 148
    return NULL;
  }

S
Shengliang Guan 已提交
149
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
150
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
151

D
dapan1121 已提交
152
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
153 154 155 156 157
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
158
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
159

S
Shengliang Guan 已提交
160
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
161 162 163
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

H
Haojun Liao 已提交
164
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
165 166
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
167 168 169 170 171
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
172 173
  }

H
Haojun Liao 已提交
174
  return pConn;
S
Shengliang Guan 已提交
175 176 177
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
178 179 180
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
181 182
}

S
Shengliang Guan 已提交
183 184
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
  SMnode     *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
185 186 187 188 189
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};
S
shm  
Shengliang Guan 已提交
190
  char        ip[30] = {0};
S
Shengliang Guan 已提交
191

S
Shengliang Guan 已提交
192
  if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
S
Shengliang Guan 已提交
193 194 195
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
196

S
Shengliang Guan 已提交
197
  taosIp2String(pReq->conn.clientIp, ip);
S
Shengliang Guan 已提交
198

S
Shengliang Guan 已提交
199
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
200
  if (pUser == NULL) {
S
Shengliang Guan 已提交
201
    mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
202 203
    goto CONN_OVER;
  }
dengyihao's avatar
dengyihao 已提交
204
  if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
S
Shengliang Guan 已提交
205
    mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
dengyihao's avatar
dengyihao 已提交
206 207 208
    code = TSDB_CODE_RPC_AUTH_FAILURE;
    goto CONN_OVER;
  }
209

S
Shengliang Guan 已提交
210
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
211 212 213
    char db[TSDB_DB_FNAME_LEN];
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
214 215
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
216
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
217
      goto CONN_OVER;
S
Shengliang Guan 已提交
218 219 220
    }
  }

S
Shengliang Guan 已提交
221 222
  pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
                        connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
223
  if (pConn == NULL) {
S
Shengliang Guan 已提交
224
    mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
225
    goto CONN_OVER;
S
Shengliang Guan 已提交
226 227
  }

S
Shengliang Guan 已提交
228 229 230 231 232
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
233
  connectRsp.connType = connReq.connType;
D
dapan 已提交
234
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
S
Shengliang Guan 已提交
235

S
Shengliang Guan 已提交
236 237 238
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
239

S
Shengliang Guan 已提交
240 241 242 243 244
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
245

S
Shengliang Guan 已提交
246 247
  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;
248

S
Shengliang Guan 已提交
249
  mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
250 251 252 253 254 255 256 257 258 259

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
260 261
}

D
dapan1121 已提交
262
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
263 264
  taosWLockLatch(&pConn->queryLock);

D
dapan1121 已提交
265
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
266

D
dapan1121 已提交
267
  pConn->pQueries = pBasic->queryDesc;
268
  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
D
dapan1121 已提交
269
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
270

271 272 273
  mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);

  taosWUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
274 275 276 277

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
278
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
279
#if 0
wafwerar's avatar
wafwerar 已提交
280
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
296
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
329
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
330 331 332 333 334 335 336 337 338
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
339 340
#endif
  return NULL;
L
Liu Jicong 已提交
341 342
}

L
Liu Jicong 已提交
343 344
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
345
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
346
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
D
dapan1121 已提交
347 348 349 350

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

D
dapan1121 已提交
351
    SRpcConnInfo connInfo = pMsg->conn;
D
dapan1121 已提交
352 353

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
354 355 356
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
                            pBasic->pid, pBasic->app, 0);
D
dapan1121 已提交
357 358 359 360 361 362 363
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
        mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
      }
    }
364
    
D
dapan1121 已提交
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
D
dapan1121 已提交
384
    rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
L
Liu Jicong 已提交
385
    rspBasic->onlineDnodes = 1;  // TODO
D
dapan1121 已提交
386
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
D
dapan1121 已提交
387 388 389

    mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
    
D
dapan1121 已提交
390 391 392
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
393 394
  } else {
    mDebug("no query info in hb msg");
D
dapan1121 已提交
395 396 397 398
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
399
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
400 401 402 403 404 405 406
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
407
    tFreeClientHbRsp(&hbRsp);
D
dapan1121 已提交
408 409 410 411 412 413 414 415
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
416
      case HEARTBEAT_KEY_USER_AUTHINFO: {
S
Shengliang Guan 已提交
417
        void   *rspMsg = NULL;
D
dapan 已提交
418 419 420 421 422 423 424 425
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
426
      case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
427
        void   *rspMsg = NULL;
D
dapan1121 已提交
428 429 430 431 432 433 434 435 436
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
437
        void   *rspMsg = NULL;
D
dapan1121 已提交
438
        int32_t rspLen = 0;
D
dapan1121 已提交
439
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
D
dapan1121 已提交
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
460 461
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
462

L
Liu Jicong 已提交
463
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
464
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
D
dapan1121 已提交
465
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
S
Shengliang Guan 已提交
466 467 468 469
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
470 471
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
472

S
Shengliang Guan 已提交
473
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
474
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
475
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
476
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
S
Shengliang Guan 已提交
477
      mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
D
dapan1121 已提交
478
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
479 480 481
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
482
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
483
      }
L
Liu Jicong 已提交
484 485
    }
  }
S
Shengliang Guan 已提交
486
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
487

S
Shengliang Guan 已提交
488 489 490 491
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
492
  tFreeClientHbBatchRsp(&batchRsp);
S
Shengliang Guan 已提交
493 494
  pReq->info.rspLen = tlen;
  pReq->info.rsp = buf;
S
Shengliang Guan 已提交
495 496 497 498

  return 0;
}

S
Shengliang Guan 已提交
499 500
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
501 502
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
503
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
504
  if (pUser == NULL) return 0;
505
  if (!pUser->superUser) {
506 507 508 509 510 511
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
512
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
513
  if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
514 515 516 517 518
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
519

S
Shengliang Guan 已提交
520
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
521
  if (pConn == NULL) {
S
Shengliang Guan 已提交
522
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
523 524 525
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
526
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
D
dapan1121 已提交
527
    pConn->killId = killReq.queryId;
528 529 530 531 532
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
533 534
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
535 536
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
537
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
538
  if (pUser == NULL) return 0;
539
  if (!pUser->superUser) {
540 541 542 543 544 545
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
546
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
547
  if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
548 549 550
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
551

S
Shengliang Guan 已提交
552
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
553
  if (pConn == NULL) {
S
Shengliang Guan 已提交
554
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
555 556 557
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
558
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
559 560 561 562 563 564
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

565 566 567 568 569 570 571
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode    *pMnode = pReq->info.node;
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
  SConnObj  *pConn = NULL;
  
H
Haojun Liao 已提交
572 573 574 575 576
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
577
  while (numOfRows < rows) {
H
Haojun Liao 已提交
578
    pConn = mndGetNextConn(pMnode, pShow->pIter);
579 580 581 582
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
583 584

    cols = 0;
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612

    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);

    char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(user, pConn->user);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)user, false);

    char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
    STR_TO_VARSTR(app, pConn->app);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)app, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);

    char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
    sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
    varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
S
Shengliang Guan 已提交
613 614 615 616

    numOfRows++;
  }

617
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
618 619 620
  return numOfRows;
}

D
dapan1121 已提交
621 622 623 624 625 626 627
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode    *pMnode = pReq->info.node;
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
  SConnObj  *pConn = NULL;
  
H
Haojun Liao 已提交
628 629 630 631 632
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
633
  while (numOfRows < rows) {
H
Haojun Liao 已提交
634
    pConn = mndGetNextConn(pMnode, pShow->pIter);
635 636 637 638
    if (pConn == NULL) {
      pShow->pIter = NULL;
      break;
    }
S
Shengliang Guan 已提交
639

640
    taosRLockLatch(&pConn->queryLock);
D
dapan1121 已提交
641
    if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) {
642
      taosRUnLockLatch(&pConn->queryLock);
D
dapan1121 已提交
643
      continue;
S
Shengliang Guan 已提交
644 645
    }

D
dapan1121 已提交
646 647
    int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
    for (int32_t i = 0; i < numOfQueries; ++i) {
D
dapan1121 已提交
648
      SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i);
S
Shengliang Guan 已提交
649 650
      cols = 0;

651 652 653
      char queryId[26 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
      varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
654
      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
655 656 657
      colDataAppend(pColInfo, numOfRows, (const char *)queryId, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
658
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false);
S
Shengliang Guan 已提交
659

D
dapan1121 已提交
660
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
661
      colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
S
Shengliang Guan 已提交
662

663 664 665 666 667 668 669 670
      char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
      STR_TO_VARSTR(app, pConn->app);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)app, false);

      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->pid, false);

D
dapan1121 已提交
671
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
672
      STR_TO_VARSTR(user, pConn->user);
D
dapan1121 已提交
673
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
674
      colDataAppend(pColInfo, numOfRows, (const char *)user, false);
S
Shengliang Guan 已提交
675

D
dapan1121 已提交
676 677 678
      char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
      sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
      varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
D
dapan1121 已提交
679
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
D
dapan1121 已提交
680
      colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
S
Shengliang Guan 已提交
681

D
dapan1121 已提交
682
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
683
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false);
S
Shengliang Guan 已提交
684

D
dapan1121 已提交
685
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
686
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false);
S
Shengliang Guan 已提交
687

688 689
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
S
Shengliang Guan 已提交
690

D
dapan1121 已提交
691
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
      colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);

      char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
      int32_t strSize = sizeof(subStatus);
      int32_t offset = VARSTR_HEADER_SIZE;
      for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
        if (i) {
          offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
        }
        SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i);
        offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
      }
      varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, subStatus, false);

      char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
      STR_TO_VARSTR(sql, pQuery->sql);
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
S
Shengliang Guan 已提交
712 713 714

      numOfRows++;
    }
715 716
    
    taosRUnLockLatch(&pConn->queryLock);
S
Shengliang Guan 已提交
717 718
  }

719
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
720 721 722 723
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
724 725 726
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
727
}
S
Shengliang Guan 已提交
728

S
Shengliang Guan 已提交
729 730 731
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  return taosCacheGetNumOfObj(pMgmt->cache);
D
dapan1121 已提交
732
}