You need to sign in or sign up before continuing.
mndProfile.c 24.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndProfile.h"
#include "mndDb.h"
S
Shengliang Guan 已提交
19
#include "mndDnode.h"
S
Shengliang Guan 已提交
20 21
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
22
#include "mndStb.h"
S
Shengliang Guan 已提交
23
#include "mndUser.h"
S
Shengliang Guan 已提交
24
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
25
#include "version.h"
S
Shengliang Guan 已提交
26

S
Shengliang Guan 已提交
27
typedef struct {
L
Liu Jicong 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
S
Shengliang Guan 已提交
41
  SArray  *pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
42 43
} SConnObj;

L
Liu Jicong 已提交
44 45
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
46
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
47
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
48
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
S
Shengliang Guan 已提交
49
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
50
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
51 52 53 54 55 56
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
57 58 59 60 61
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
62 63
  // in ms
  int32_t connCheckTime = tsShellActivityTimer * 2 * 1000;
S
Shengliang Guan 已提交
64 65 66 67 68 69 70
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
71 72 73 74
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
75

L
Liu Jicong 已提交
76
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
77
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
L
Liu Jicong 已提交
78
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
79
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
80

S
Shengliang Guan 已提交
81 82 83 84 85 86 87 88 89 90 91
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

L
Liu Jicong 已提交
92 93
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
94 95
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
96
  char    connStr[255] = {0};
D
dapan1121 已提交
97 98
  int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
  int32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
99
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
100

S
Shengliang Guan 已提交
101
  SConnObj connObj = {.id = connId,
L
Liu Jicong 已提交
102
                      .connType = connType,
S
Shengliang Guan 已提交
103 104
                      .appStartTimeMs = startTime,
                      .pid = pid,
S
shm  
Shengliang Guan 已提交
105 106
                      .ip = ip,
                      .port = port,
S
Shengliang Guan 已提交
107
                      .killed = 0,
S
Shengliang Guan 已提交
108 109
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
D
dapan1121 已提交
110
                      .killId = 0,
S
Shengliang Guan 已提交
111 112 113
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
114
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
115
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
116 117
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
118
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
119
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
120 121
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
122
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
123 124
    return NULL;
  } else {
D
dapan1121 已提交
125
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
126 127
    return pConn;
  }
S
Shengliang Guan 已提交
128 129 130
}

static void mndFreeConn(SConnObj *pConn) {
wafwerar's avatar
wafwerar 已提交
131
  taosMemoryFreeClear(pConn->pQueries);
D
dapan1121 已提交
132
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
133 134
}

D
dapan1121 已提交
135
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
136 137
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
138
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
139
  if (pConn == NULL) {
D
dapan1121 已提交
140
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
141 142 143
    return NULL;
  }

S
Shengliang Guan 已提交
144
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
145
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
146

D
dapan1121 已提交
147
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
148 149 150 151 152
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
153
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
154

S
Shengliang Guan 已提交
155
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
156 157 158
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

H
Haojun Liao 已提交
159
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
160 161
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
162 163 164 165 166
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
167 168
  }

H
Haojun Liao 已提交
169
  return pConn;
S
Shengliang Guan 已提交
170 171 172
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
173 174 175
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
176 177
}

S
Shengliang Guan 已提交
178 179
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
  SMnode     *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
180 181 182 183 184
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};
S
shm  
Shengliang Guan 已提交
185
  char        ip[30] = {0};
S
Shengliang Guan 已提交
186

S
Shengliang Guan 已提交
187
  if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
S
Shengliang Guan 已提交
188 189 190
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
191

S
Shengliang Guan 已提交
192
  taosIp2String(pReq->conn.clientIp, ip);
S
Shengliang Guan 已提交
193

S
Shengliang Guan 已提交
194
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
195
  if (pUser == NULL) {
S
Shengliang Guan 已提交
196
    mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
197 198
    goto CONN_OVER;
  }
dengyihao's avatar
dengyihao 已提交
199
  if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
S
Shengliang Guan 已提交
200
    mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
dengyihao's avatar
dengyihao 已提交
201 202 203
    code = TSDB_CODE_RPC_AUTH_FAILURE;
    goto CONN_OVER;
  }
204

S
Shengliang Guan 已提交
205
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
206 207 208
    char db[TSDB_DB_FNAME_LEN];
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
209 210
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
211
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
212
      goto CONN_OVER;
S
Shengliang Guan 已提交
213 214 215
    }
  }

S
Shengliang Guan 已提交
216 217
  pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
                        connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
218
  if (pConn == NULL) {
S
Shengliang Guan 已提交
219
    mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
220
    goto CONN_OVER;
S
Shengliang Guan 已提交
221 222
  }

D
dapan1121 已提交
223 224
  mndAcquireConn(pMnode, pConn->id);

S
Shengliang Guan 已提交
225 226 227 228 229
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
230
  connectRsp.connType = connReq.connType;
D
dapan 已提交
231
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
S
Shengliang Guan 已提交
232

S
Shengliang Guan 已提交
233 234 235
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
236

S
Shengliang Guan 已提交
237 238 239 240 241
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
242

S
Shengliang Guan 已提交
243 244
  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;
245

S
Shengliang Guan 已提交
246
  mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
247 248 249 250 251 252 253 254 255 256

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
257 258
}

D
dapan1121 已提交
259 260
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
261

D
dapan1121 已提交
262 263
  pConn->pQueries = pBasic->queryDesc;
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
264 265

  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
S
Shengliang Guan 已提交
266 267 268 269

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
270
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
271
#if 0
wafwerar's avatar
wafwerar 已提交
272
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
288
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
321
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
322 323 324 325 326 327 328 329 330
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
331 332
#endif
  return NULL;
L
Liu Jicong 已提交
333 334
}

L
Liu Jicong 已提交
335 336
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
337
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
338
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
D
dapan1121 已提交
339 340 341 342 343

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

    SRpcConnInfo connInfo = {0};
S
Shengliang Guan 已提交
344
    rpcGetConnInfo(pMsg->info.handle, &connInfo);
D
dapan1121 已提交
345 346

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
347 348 349
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
                            pBasic->pid, pBasic->app, 0);
D
dapan1121 已提交
350 351 352 353 354 355 356 357
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
        mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
      }
    } else if (pConn->killed) {
      mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
L
Liu Jicong 已提交
358
      mndReleaseConn(pMnode, pConn);
D
dapan1121 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }

    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
L
Liu Jicong 已提交
382 383
    rspBasic->totalDnodes = 1;   // TODO
    rspBasic->onlineDnodes = 1;  // TODO
D
dapan1121 已提交
384 385 386 387 388 389 390 391
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
392
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
408
      case HEARTBEAT_KEY_USER_AUTHINFO: {
S
Shengliang Guan 已提交
409
        void   *rspMsg = NULL;
D
dapan 已提交
410 411 412 413 414 415 416 417
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
418
      case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
419
        void   *rspMsg = NULL;
D
dapan1121 已提交
420 421 422 423 424 425 426 427 428
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
429
        void   *rspMsg = NULL;
D
dapan1121 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
        int32_t rspLen = 0;
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
452 453
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
454

L
Liu Jicong 已提交
455
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
456
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
S
Shengliang Guan 已提交
457 458 459 460
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
461 462
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
463

S
Shengliang Guan 已提交
464
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
465
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
466
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
467
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
S
Shengliang Guan 已提交
468
      mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
D
dapan1121 已提交
469
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
470 471 472
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
473
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
474
      }
L
Liu Jicong 已提交
475 476
    }
  }
S
Shengliang Guan 已提交
477
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
478

S
Shengliang Guan 已提交
479 480 481 482
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
483 484 485
  int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
  for (int32_t i = 0; i < rspNum; ++i) {
    SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
S
Shengliang Guan 已提交
486
    int32_t       kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
D
dapan1121 已提交
487 488
    for (int32_t n = 0; n < kvNum; ++n) {
      SKv *kv = taosArrayGet(rsp->info, n);
wafwerar's avatar
wafwerar 已提交
489
      taosMemoryFreeClear(kv->value);
D
dapan1121 已提交
490 491 492 493
    }
    taosArrayDestroy(rsp->info);
  }

L
Liu Jicong 已提交
494
  taosArrayDestroy(batchRsp.rsps);
S
Shengliang Guan 已提交
495 496
  pReq->info.rspLen = tlen;
  pReq->info.rsp = buf;
S
Shengliang Guan 已提交
497 498 499 500

  return 0;
}

S
Shengliang Guan 已提交
501 502
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
503 504
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
505
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
506
  if (pUser == NULL) return 0;
507
  if (!pUser->superUser) {
508 509 510 511 512 513
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
514
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
515
  if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
516 517 518 519 520
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
521

S
Shengliang Guan 已提交
522
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
523
  if (pConn == NULL) {
S
Shengliang Guan 已提交
524
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
525 526 527
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
528
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
D
dapan1121 已提交
529
    pConn->killId = killReq.queryId;
530 531 532 533 534
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
535 536
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
537 538
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
539
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
540
  if (pUser == NULL) return 0;
541
  if (!pUser->superUser) {
542 543 544 545 546 547
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
548
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
549
  if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
550 551 552
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
553

S
Shengliang Guan 已提交
554
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
555
  if (pConn == NULL) {
S
Shengliang Guan 已提交
556
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
557 558 559
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
560
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
561 562 563 564 565 566
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
567 568
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
569
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
570
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
571 572 573 574
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

H
Haojun Liao 已提交
575 576 577 578 579
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
580
  while (numOfRows < rows) {
H
Haojun Liao 已提交
581
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
582
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
583 584

    cols = 0;
585 586
#if 0
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
D
dapan1121 已提交
587
    *(uint32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
588 589
    cols++;

590 591
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
592 593 594
    cols++;

    // app name
595 596
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
597 598 599
    cols++;

    // app pid
600
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
601
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
602 603
    cols++;

604
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
605
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
606
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
607 608
    cols++;

609
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
610
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
611 612
    cols++;

613
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
614 615
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
616
    cols++;
617
#endif
S
Shengliang Guan 已提交
618 619 620 621

    numOfRows++;
  }

622
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
623 624 625 626

  return numOfRows;
}

S
Shengliang Guan 已提交
627 628 629
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode *pMnode = pReq->info.node;
  int32_t numOfRows = 0;
D
dapan1121 已提交
630
#if 0
S
Shengliang Guan 已提交
631
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
632
  int32_t   cols = 0;
633 634
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
635 636
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

H
Haojun Liao 已提交
637 638 639 640 641
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
642
  while (numOfRows < rows) {
H
Haojun Liao 已提交
643
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
644
    if (pConn == NULL) {
H
Haojun Liao 已提交
645
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
646 647 648
      break;
    }

S
Shengliang Guan 已提交
649
    if (numOfRows + pConn->numOfQueries >= rows) {
H
Haojun Liao 已提交
650 651
      taosCacheDestroyIter(pShow->pIter);
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
652 653 654
      break;
    }

S
Shengliang Guan 已提交
655 656
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
657 658
      cols = 0;

659
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
660 661 662
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

663
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
664
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
665 666
      cols++;

667 668
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
669 670
      cols++;

671
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
672
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
673
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
674 675 676 677
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
678
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
679

680
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
681 682
      cols++;

683
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
684 685 686
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

687
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
688 689 690 691
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
692 693
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
694 695
      cols++;

696
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
697 698 699 700
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
701
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
702 703
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
704 705
      cols++;

706
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
707 708 709
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

710
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
711 712 713
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

714 715
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
716 717
      cols++;

718 719
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
720 721 722 723 724 725
      cols++;

      numOfRows++;
    }
  }

726
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
727
#endif
S
Shengliang Guan 已提交
728 729 730 731
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
732 733 734
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
735
}
S
Shengliang Guan 已提交
736

S
Shengliang Guan 已提交
737 738 739
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  return taosCacheGetNumOfObj(pMgmt->cache);
D
dapan1121 已提交
740
}