mndProfile.c 24.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndProfile.h"
#include "mndDb.h"
S
Shengliang Guan 已提交
19
#include "mndDnode.h"
S
Shengliang Guan 已提交
20 21
#include "mndMnode.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
22
#include "mndStb.h"
S
Shengliang Guan 已提交
23
#include "mndUser.h"
S
Shengliang Guan 已提交
24
#include "tglobal.h"
S
version  
Shengliang Guan 已提交
25
#include "version.h"
S
Shengliang Guan 已提交
26

S
Shengliang Guan 已提交
27
typedef struct {
L
Liu Jicong 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40
  uint32_t id;
  int8_t   connType;
  char     user[TSDB_USER_LEN];
  char     app[TSDB_APP_NAME_LEN];  // app name that invokes taosc
  int64_t  appStartTimeMs;          // app start time
  int32_t  pid;                     // pid of app that invokes taosc
  uint32_t ip;
  uint16_t port;
  int8_t   killed;
  int64_t  loginTimeMs;
  int64_t  lastAccessTimeMs;
  uint64_t killId;
  int32_t  numOfQueries;
S
Shengliang Guan 已提交
41
  SArray  *pQueries;  // SArray<SQueryDesc>
S
Shengliang Guan 已提交
42 43
} SConnObj;

L
Liu Jicong 已提交
44 45
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime);
S
Shengliang Guan 已提交
46
static void      mndFreeConn(SConnObj *pConn);
D
dapan1121 已提交
47
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
S
Shengliang Guan 已提交
48
static void      mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
S
Shengliang Guan 已提交
49
static void     *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
S
Shengliang Guan 已提交
50
static void      mndCancelGetNextConn(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
51 52 53 54 55 56
static int32_t   mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t   mndProcessConnectReq(SRpcMsg *pReq);
static int32_t   mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t   mndProcessKillConnReq(SRpcMsg *pReq);
static int32_t   mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t   mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
57 58 59 60 61
static void      mndCancelGetNextQuery(SMnode *pMnode, void *pIter);

int32_t mndInitProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
62 63
  // in ms
  int32_t connCheckTime = tsShellActivityTimer * 2 * 1000;
S
Shengliang Guan 已提交
64 65 66 67 68 69 70
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc profile cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
71 72 73 74
  mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
  mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
S
Shengliang Guan 已提交
75

L
Liu Jicong 已提交
76
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
S
Shengliang Guan 已提交
77
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
L
Liu Jicong 已提交
78
  //  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
S
Shengliang Guan 已提交
79
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
80

S
Shengliang Guan 已提交
81 82 83 84 85 86 87 88 89 90 91
  return 0;
}

void mndCleanupProfile(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

L
Liu Jicong 已提交
92 93
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
                               int32_t pid, const char *app, int64_t startTime) {
S
Shengliang Guan 已提交
94 95
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

L
Liu Jicong 已提交
96
  char    connStr[255] = {0};
D
dapan1121 已提交
97 98
  int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
  int32_t connId = mndGenerateUid(connStr, len);
S
Shengliang Guan 已提交
99
  if (startTime == 0) startTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
100

S
Shengliang Guan 已提交
101
  SConnObj connObj = {.id = connId,
L
Liu Jicong 已提交
102
                      .connType = connType,
S
Shengliang Guan 已提交
103 104
                      .appStartTimeMs = startTime,
                      .pid = pid,
S
shm  
Shengliang Guan 已提交
105 106
                      .ip = ip,
                      .port = port,
S
Shengliang Guan 已提交
107
                      .killed = 0,
S
Shengliang Guan 已提交
108 109
                      .loginTimeMs = taosGetTimestampMs(),
                      .lastAccessTimeMs = 0,
D
dapan1121 已提交
110
                      .killId = 0,
S
Shengliang Guan 已提交
111 112 113
                      .numOfQueries = 0,
                      .pQueries = NULL};

S
Shengliang Guan 已提交
114
  connObj.lastAccessTimeMs = connObj.loginTimeMs;
S
shm  
Shengliang Guan 已提交
115
  tstrncpy(connObj.user, user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
116 117
  tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);

S
Shengliang Guan 已提交
118
  int32_t   keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
119
  SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
S
Shengliang Guan 已提交
120 121
  if (pConn == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
shm  
Shengliang Guan 已提交
122
    mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
S
Shengliang Guan 已提交
123 124
    return NULL;
  } else {
D
dapan1121 已提交
125
    mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
S
Shengliang Guan 已提交
126 127
    return pConn;
  }
S
Shengliang Guan 已提交
128 129 130
}

static void mndFreeConn(SConnObj *pConn) {
D
dapan1121 已提交
131 132
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
  
D
dapan1121 已提交
133
  mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
134 135
}

D
dapan1121 已提交
136
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
S
Shengliang Guan 已提交
137 138
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

D
dapan1121 已提交
139
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId));
S
Shengliang Guan 已提交
140
  if (pConn == NULL) {
D
dapan1121 已提交
141
    mDebug("conn:%u, already destroyed", connId);
S
Shengliang Guan 已提交
142 143 144
    return NULL;
  }

S
Shengliang Guan 已提交
145
  int32_t keepTime = tsShellActivityTimer * 3;
S
Shengliang Guan 已提交
146
  pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
S
Shengliang Guan 已提交
147

D
dapan1121 已提交
148
  mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
149 150 151 152 153
  return pConn;
}

static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
  if (pConn == NULL) return;
D
dapan1121 已提交
154
  mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
S
Shengliang Guan 已提交
155

S
Shengliang Guan 已提交
156
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
S
Shengliang Guan 已提交
157 158 159
  taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}

H
Haojun Liao 已提交
160
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
L
Liu Jicong 已提交
161 162
  SConnObj *pConn = NULL;
  bool      hasNext = taosCacheIterNext(pIter);
H
Haojun Liao 已提交
163 164 165 166 167
  if (hasNext) {
    size_t dataLen = 0;
    pConn = taosCacheIterGetData(pIter, &dataLen);
  } else {
    taosCacheDestroyIter(pIter);
S
Shengliang Guan 已提交
168 169
  }

H
Haojun Liao 已提交
170
  return pConn;
S
Shengliang Guan 已提交
171 172 173
}

static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
174 175 176
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
177 178
}

S
Shengliang Guan 已提交
179 180
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
  SMnode     *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
181 182 183 184 185
  SUserObj   *pUser = NULL;
  SDbObj     *pDb = NULL;
  SConnObj   *pConn = NULL;
  int32_t     code = -1;
  SConnectReq connReq = {0};
S
shm  
Shengliang Guan 已提交
186
  char        ip[30] = {0};
S
Shengliang Guan 已提交
187

S
Shengliang Guan 已提交
188
  if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
S
Shengliang Guan 已提交
189 190 191
    terrno = TSDB_CODE_INVALID_MSG;
    goto CONN_OVER;
  }
S
Shengliang Guan 已提交
192

S
Shengliang Guan 已提交
193
  taosIp2String(pReq->conn.clientIp, ip);
S
Shengliang Guan 已提交
194

S
Shengliang Guan 已提交
195
  pUser = mndAcquireUser(pMnode, pReq->conn.user);
196
  if (pUser == NULL) {
S
Shengliang Guan 已提交
197
    mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
198 199
    goto CONN_OVER;
  }
dengyihao's avatar
dengyihao 已提交
200
  if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
S
Shengliang Guan 已提交
201
    mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
dengyihao's avatar
dengyihao 已提交
202 203 204
    code = TSDB_CODE_RPC_AUTH_FAILURE;
    goto CONN_OVER;
  }
205

S
Shengliang Guan 已提交
206
  if (connReq.db[0]) {
S
Shengliang Guan 已提交
207 208 209
    char db[TSDB_DB_FNAME_LEN];
    snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
    pDb = mndAcquireDb(pMnode, db);
S
Shengliang Guan 已提交
210 211
    if (pDb == NULL) {
      terrno = TSDB_CODE_MND_INVALID_DB;
S
Shengliang Guan 已提交
212
      mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
213
      goto CONN_OVER;
S
Shengliang Guan 已提交
214 215 216
    }
  }

S
Shengliang Guan 已提交
217 218
  pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
                        connReq.pid, connReq.app, connReq.startTime);
S
Shengliang Guan 已提交
219
  if (pConn == NULL) {
S
Shengliang Guan 已提交
220
    mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
221
    goto CONN_OVER;
S
Shengliang Guan 已提交
222 223
  }

D
dapan1121 已提交
224 225
  mndAcquireConn(pMnode, pConn->id);

S
Shengliang Guan 已提交
226 227 228 229 230
  SConnectRsp connectRsp = {0};
  connectRsp.acctId = pUser->acctId;
  connectRsp.superUser = pUser->superUser;
  connectRsp.clusterId = pMnode->clusterId;
  connectRsp.connId = pConn->id;
L
Liu Jicong 已提交
231
  connectRsp.connType = connReq.connType;
D
dapan 已提交
232
  connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
S
Shengliang Guan 已提交
233

S
Shengliang Guan 已提交
234 235 236
  snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
           gitinfo);
  mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
237

S
Shengliang Guan 已提交
238 239 240 241 242
  int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
  if (contLen < 0) goto CONN_OVER;
  void *pRsp = rpcMallocCont(contLen);
  if (pRsp == NULL) goto CONN_OVER;
  tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
S
Shengliang Guan 已提交
243

S
Shengliang Guan 已提交
244 245
  pReq->info.rspLen = contLen;
  pReq->info.rsp = pRsp;
246

S
Shengliang Guan 已提交
247
  mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
248 249 250 251 252 253 254 255 256 257

  code = 0;

CONN_OVER:

  mndReleaseUser(pMnode, pUser);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConn(pMnode, pConn);

  return code;
S
Shengliang Guan 已提交
258 259
}

D
dapan1121 已提交
260 261
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
  taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
S
Shengliang Guan 已提交
262

D
dapan1121 已提交
263 264
  pConn->pQueries = pBasic->queryDesc;
  pBasic->queryDesc = NULL;
L
Liu Jicong 已提交
265 266

  pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
S
Shengliang Guan 已提交
267 268 269 270

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
271
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
L
Liu Jicong 已提交
272
#if 0
wafwerar's avatar
wafwerar 已提交
273
  SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
L
Liu Jicong 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  pRsp->connKey = pReq->connKey;
  SMqHbBatchRsp batchRsp;
  batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
  if (batchRsp.batchRsps == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  SClientHbKey connKey = pReq->connKey;
  SHashObj* pObj =  pReq->info;
  SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
  if (pKv == NULL) {
wafwerar's avatar
wafwerar 已提交
289
    taosMemoryFree(pRsp);
L
Liu Jicong 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    return NULL;
  }
  SMqHbMsg mqHb;
  taosDecodeSMqMsg(pKv->value, &mqHb);
  /*int64_t clientUid = htonl(pKv->value);*/
  /*if (mqHb.epoch )*/
  int sz = taosArrayGetSize(mqHb.pTopics);
  SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); 
  for (int i = 0; i < sz; i++) {
    SMqHbOneTopicBatchRsp innerBatchRsp;
    innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
    if (innerBatchRsp.rsps == NULL) {
      //TODO
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
    SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
    if (pConsumerTopic->epoch != topicInfo->epoch) {
      //add new vgids into rsp
      int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
      for (int j = 0; j < vgSz; j++) {
        SMqHbRsp innerRsp;
        SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
        SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
        innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
        taosArrayPush(innerBatchRsp.rsps, &innerRsp);
      }
    }
    taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
  }
  int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
wafwerar's avatar
wafwerar 已提交
322
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
323 324 325 326 327 328 329 330 331
  if (buf == NULL) {
    //TODO
    return NULL;
  }
  void* abuf = buf;
  taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
  pRsp->body = buf;
  pRsp->bodyLen = tlen;
  return pRsp;
L
Liu Jicong 已提交
332 333
#endif
  return NULL;
L
Liu Jicong 已提交
334 335
}

L
Liu Jicong 已提交
336 337
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
                                        SClientHbBatchRsp *pBatchRsp) {
D
dapan1121 已提交
338
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
L
Liu Jicong 已提交
339
  SClientHbRsp  hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
D
dapan1121 已提交
340 341 342 343

  if (pHbReq->query) {
    SQueryHbReqBasic *pBasic = pHbReq->query;

D
dapan1121 已提交
344
    SRpcConnInfo connInfo = pMsg->conn;
D
dapan1121 已提交
345 346

    SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
L
Liu Jicong 已提交
347 348 349
    if (pConn == NULL) {
      pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
                            pBasic->pid, pBasic->app, 0);
D
dapan1121 已提交
350 351 352 353 354 355 356 357
      if (pConn == NULL) {
        mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
        return -1;
      } else {
        mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
      }
    } else if (pConn->killed) {
      mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
L
Liu Jicong 已提交
358
      mndReleaseConn(pMnode, pConn);
D
dapan1121 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
      terrno = TSDB_CODE_MND_INVALID_CONNECTION;
      return -1;
    }

    SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
    if (rspBasic == NULL) {
      mndReleaseConn(pMnode, pConn);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
      return -1;
    }

    mndSaveQueryList(pConn, pBasic);
    if (pConn->killed != 0) {
      rspBasic->killConnection = 1;
    }

    if (pConn->killId != 0) {
      rspBasic->killRid = pConn->killId;
      pConn->killId = 0;
    }

    rspBasic->connId = pConn->id;
D
dapan1121 已提交
382
    rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
L
Liu Jicong 已提交
383
    rspBasic->onlineDnodes = 1;  // TODO
D
dapan1121 已提交
384 385 386 387 388 389 390 391
    mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
    mndReleaseConn(pMnode, pConn);

    hbRsp.query = rspBasic;
  }

  int32_t kvNum = taosHashGetSize(pHbReq->info);
  if (NULL == pHbReq->info || kvNum <= 0) {
L
Liu Jicong 已提交
392
    taosArrayPush(pBatchRsp->rsps, &hbRsp);
D
dapan1121 已提交
393 394 395 396 397 398 399
    return TSDB_CODE_SUCCESS;
  }

  hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
  if (NULL == hbRsp.info) {
    mError("taosArrayInit %d rsp kv failed", kvNum);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
400
    tFreeClientHbRsp(&hbRsp);
D
dapan1121 已提交
401 402 403 404 405 406 407 408
    return -1;
  }

  void *pIter = taosHashIterate(pHbReq->info, NULL);
  while (pIter != NULL) {
    SKv *kv = pIter;

    switch (kv->key) {
D
dapan 已提交
409
      case HEARTBEAT_KEY_USER_AUTHINFO: {
S
Shengliang Guan 已提交
410
        void   *rspMsg = NULL;
D
dapan 已提交
411 412 413 414 415 416 417 418
        int32_t rspLen = 0;
        mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
D
dapan1121 已提交
419
      case HEARTBEAT_KEY_DBINFO: {
S
Shengliang Guan 已提交
420
        void   *rspMsg = NULL;
D
dapan1121 已提交
421 422 423 424 425 426 427 428 429
        int32_t rspLen = 0;
        mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      case HEARTBEAT_KEY_STBINFO: {
S
Shengliang Guan 已提交
430
        void   *rspMsg = NULL;
D
dapan1121 已提交
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
        int32_t rspLen = 0;
        mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
        if (rspMsg && rspLen > 0) {
          SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
          taosArrayPush(hbRsp.info, &kv1);
        }
        break;
      }
      default:
        mError("invalid kv key:%d", kv->key);
        hbRsp.status = TSDB_CODE_MND_APP_ERROR;
        break;
    }

    pIter = taosHashIterate(pHbReq->info, pIter);
  }

  taosArrayPush(pBatchRsp->rsps, &hbRsp);

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
453 454
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
455

L
Liu Jicong 已提交
456
  SClientHbBatchReq batchReq = {0};
S
Shengliang Guan 已提交
457
  if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
D
dapan1121 已提交
458
    taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
S
Shengliang Guan 已提交
459 460 461 462
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

L
Liu Jicong 已提交
463 464
  SClientHbBatchRsp batchRsp = {0};
  batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
L
Liu Jicong 已提交
465

S
Shengliang Guan 已提交
466
  int32_t sz = taosArrayGetSize(batchReq.reqs);
L
Liu Jicong 已提交
467
  for (int i = 0; i < sz; i++) {
S
Shengliang Guan 已提交
468
    SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
D
dapan1121 已提交
469
    if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
S
Shengliang Guan 已提交
470
      mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
D
dapan1121 已提交
471
    } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
L
Liu Jicong 已提交
472 473 474
      SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
      if (pRsp != NULL) {
        taosArrayPush(batchRsp.rsps, pRsp);
wafwerar's avatar
wafwerar 已提交
475
        taosMemoryFree(pRsp);
L
Liu Jicong 已提交
476
      }
L
Liu Jicong 已提交
477 478
    }
  }
S
Shengliang Guan 已提交
479
  taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
L
Liu Jicong 已提交
480

S
Shengliang Guan 已提交
481 482 483 484
  int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
  void   *buf = rpcMallocCont(tlen);
  tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);

D
dapan1121 已提交
485
  tFreeClientHbBatchRsp(&batchRsp);
S
Shengliang Guan 已提交
486 487
  pReq->info.rspLen = tlen;
  pReq->info.rsp = buf;
S
Shengliang Guan 已提交
488 489 490 491

  return 0;
}

S
Shengliang Guan 已提交
492 493
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
494 495
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
496
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
497
  if (pUser == NULL) return 0;
498
  if (!pUser->superUser) {
499 500 501 502 503 504
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
505
  SKillQueryReq killReq = {0};
S
Shengliang Guan 已提交
506
  if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
507 508 509 510 511
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  mInfo("kill query msg is received, queryId:%d", killReq.queryId);
512

S
Shengliang Guan 已提交
513
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
514
  if (pConn == NULL) {
S
Shengliang Guan 已提交
515
    mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
516 517 518
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
519
    mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
D
dapan1121 已提交
520
    pConn->killId = killReq.queryId;
521 522 523 524 525
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return 0;
  }
}

S
Shengliang Guan 已提交
526 527
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
  SMnode       *pMnode = pReq->info.node;
528 529
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;

S
Shengliang Guan 已提交
530
  SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
531
  if (pUser == NULL) return 0;
532
  if (!pUser->superUser) {
533 534 535 536 537 538
    mndReleaseUser(pMnode, pUser);
    terrno = TSDB_CODE_MND_NO_RIGHTS;
    return -1;
  }
  mndReleaseUser(pMnode, pUser);

S
Shengliang Guan 已提交
539
  SKillConnReq killReq = {0};
S
Shengliang Guan 已提交
540
  if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
S
Shengliang Guan 已提交
541 542 543
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
544

S
Shengliang Guan 已提交
545
  SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
546
  if (pConn == NULL) {
S
Shengliang Guan 已提交
547
    mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
548 549 550
    terrno = TSDB_CODE_MND_INVALID_CONN_ID;
    return -1;
  } else {
S
Shengliang Guan 已提交
551
    mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
552 553 554 555 556 557
    pConn->killed = 1;
    taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
    return TSDB_CODE_SUCCESS;
  }
}

S
Shengliang Guan 已提交
558 559
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode   *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
560
  int32_t   numOfRows = 0;
S
Shengliang Guan 已提交
561
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
562 563 564 565
  int32_t   cols = 0;
  char     *pWrite;
  char      ipStr[TSDB_IPv4ADDR_LEN + 6];

H
Haojun Liao 已提交
566 567 568 569 570
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
571
  while (numOfRows < rows) {
H
Haojun Liao 已提交
572
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
573
    if (pConn == NULL) break;
S
Shengliang Guan 已提交
574 575

    cols = 0;
576 577
#if 0
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
D
dapan1121 已提交
578
    *(uint32_t *)pWrite = pConn->id;
S
Shengliang Guan 已提交
579 580
    cols++;

581 582
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
583 584 585
    cols++;

    // app name
586 587
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
588 589 590
    cols++;

    // app pid
591
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
592
    *(int32_t *)pWrite = pConn->pid;
S
Shengliang Guan 已提交
593 594
    cols++;

595
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
596
    taosIpPort2String(pConn->ip, pConn->port, ipStr);
597
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
598 599
    cols++;

600
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
601
    *(int64_t *)pWrite = pConn->loginTimeMs;
S
Shengliang Guan 已提交
602 603
    cols++;

604
    pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
605 606
    if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
    *(int64_t *)pWrite = pConn->lastAccessTimeMs;
S
Shengliang Guan 已提交
607
    cols++;
608
#endif
S
Shengliang Guan 已提交
609 610 611 612

    numOfRows++;
  }

613
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
614 615 616 617

  return numOfRows;
}

S
Shengliang Guan 已提交
618 619 620
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode *pMnode = pReq->info.node;
  int32_t numOfRows = 0;
D
dapan1121 已提交
621
#if 0
S
Shengliang Guan 已提交
622
  SConnObj *pConn = NULL;
S
Shengliang Guan 已提交
623
  int32_t   cols = 0;
624 625
  char     *pWrite;
  void     *pIter;
S
Shengliang Guan 已提交
626 627
  char      str[TSDB_IPv4ADDR_LEN + 6] = {0};

H
Haojun Liao 已提交
628 629 630 631 632
  if (pShow->pIter == NULL) {
    SProfileMgmt *pMgmt = &pMnode->profileMgmt;
    pShow->pIter = taosCacheCreateIter(pMgmt->cache);
  }

S
Shengliang Guan 已提交
633
  while (numOfRows < rows) {
H
Haojun Liao 已提交
634
    pConn = mndGetNextConn(pMnode, pShow->pIter);
S
Shengliang Guan 已提交
635
    if (pConn == NULL) {
H
Haojun Liao 已提交
636
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
637 638 639
      break;
    }

S
Shengliang Guan 已提交
640
    if (numOfRows + pConn->numOfQueries >= rows) {
H
Haojun Liao 已提交
641 642
      taosCacheDestroyIter(pShow->pIter);
      pShow->pIter = NULL;
S
Shengliang Guan 已提交
643 644 645
      break;
    }

S
Shengliang Guan 已提交
646 647
    for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
      SQueryDesc *pDesc = pConn->pQueries + i;
S
Shengliang Guan 已提交
648 649
      cols = 0;

650
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
651 652 653
      *(int64_t *)pWrite = htobe64(pDesc->queryId);
      cols++;

654
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
655
      *(int64_t *)pWrite = htobe64(pConn->id);
S
Shengliang Guan 已提交
656 657
      cols++;

658 659
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
660 661
      cols++;

662
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
663
      snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
664
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
665 666 667 668
      cols++;

      char handleBuf[24] = {0};
      snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
669
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
670

671
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
672 673
      cols++;

674
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
675 676 677
      *(int64_t *)pWrite = htobe64(pDesc->stime);
      cols++;

678
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
679 680 681 682
      *(int64_t *)pWrite = htobe64(pDesc->useconds);
      cols++;

      snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
683 684
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
685 686
      cols++;

687
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
688 689 690 691
      *(int32_t *)pWrite = htonl(pDesc->pid);
      cols++;

      char epBuf[TSDB_EP_LEN + 1] = {0};
S
Shengliang Guan 已提交
692
      snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
693 694
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
695 696
      cols++;

697
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
698 699 700
      *(bool *)pWrite = pDesc->stableQuery;
      cols++;

701
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
S
Shengliang Guan 已提交
702 703 704
      *(int32_t *)pWrite = htonl(pDesc->numOfSub);
      cols++;

705 706
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
707 708
      cols++;

709 710
      pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->pMeta->pSchemas[cols].bytes);
S
Shengliang Guan 已提交
711 712 713 714 715 716
      cols++;

      numOfRows++;
    }
  }

717
  pShow->numOfRows += numOfRows;
L
Liu Jicong 已提交
718
#endif
S
Shengliang Guan 已提交
719 720 721 722
  return numOfRows;
}

static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
H
Haojun Liao 已提交
723 724 725
  if (pIter != NULL) {
    taosCacheDestroyIter(pIter);
  }
S
Shengliang Guan 已提交
726
}
S
Shengliang Guan 已提交
727

S
Shengliang Guan 已提交
728 729 730
int32_t mndGetNumOfConnections(SMnode *pMnode) {
  SProfileMgmt *pMgmt = &pMnode->profileMgmt;
  return taosCacheGetNumOfObj(pMgmt->cache);
D
dapan1121 已提交
731
}