mndShow.c 10.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndShow.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20
#define SHOW_STEP_SIZE 100

S
Shengliang Guan 已提交
21
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq);
S
Shengliang Guan 已提交
22
static void      mndFreeShowObj(SShowObj *pShow);
23
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
S
Shengliang Guan 已提交
24
static void      mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
S
Shengliang Guan 已提交
25 26
static int32_t   mndProcessShowReq(SMnodeMsg *pReq);
static int32_t   mndProcessRetrieveReq(SMnodeMsg *pReq);
S
Shengliang Guan 已提交
27
static bool      mndCheckRetrieveFinished(SShowObj *pShow);
S
Shengliang Guan 已提交
28 29 30 31

int32_t mndInitShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;

S
Shengliang Guan 已提交
32
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, (__cache_free_fn_t)mndFreeShowObj, "show");
S
Shengliang Guan 已提交
33 34 35 36 37 38
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc show cache since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
39 40
  mndSetMsgHandle(pMnode, TDMT_MND_SHOW, mndProcessShowReq);
  mndSetMsgHandle(pMnode, TDMT_MND_SHOW_RETRIEVE, mndProcessRetrieveReq);
S
Shengliang Guan 已提交
41 42 43
  return 0;
}

S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51
void mndCleanupShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
Shengliang Guan 已提交
52
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq) {
S
Shengliang Guan 已提交
53 54
  SShowMgmt *pMgmt = &pMnode->showMgmt;

55 56
  int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1);
  if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1);
S
Shengliang Guan 已提交
57

S
Shengliang Guan 已提交
58
  int32_t  size = sizeof(SShowObj) + pReq->payloadLen;
S
Shengliang Guan 已提交
59 60 61
  SShowObj showObj = {0};
  showObj.id = showId;
  showObj.pMnode = pMnode;
S
Shengliang Guan 已提交
62 63 64 65
  showObj.type = pReq->type;
  showObj.payloadLen = pReq->payloadLen;
  memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
  memcpy(showObj.payload, pReq->payload, pReq->payloadLen);
S
Shengliang Guan 已提交
66

S
Shengliang Guan 已提交
67
  int32_t   keepTime = pMnode->cfg.shellActivityTimer * 6 * 1000;
68
  SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
S
Shengliang Guan 已提交
69
  if (pShow == NULL) {
S
Shengliang Guan 已提交
70
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
71
    mError("show:0x%" PRIx64 ", failed to put into cache since %s", showId, terrstr());
S
Shengliang Guan 已提交
72 73
    return NULL;
  }
S
Shengliang Guan 已提交
74

S
Shengliang Guan 已提交
75
  mTrace("show:0x%" PRIx64 ", is created, data:%p", showId, pShow);
S
Shengliang Guan 已提交
76
  return pShow;
S
Shengliang Guan 已提交
77 78
}

S
Shengliang Guan 已提交
79
static void mndFreeShowObj(SShowObj *pShow) {
S
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89
  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;

  ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
  if (freeFp != NULL) {
    if (pShow->pIter != NULL) {
      (*freeFp)(pMnode, pShow->pIter);
    }
  }

S
Shengliang Guan 已提交
90
  mTrace("show:0x%" PRIx64 ", is destroyed, data:%p", pShow->id, pShow);
S
Shengliang Guan 已提交
91 92
}

93
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId) {
S
Shengliang Guan 已提交
94 95
  SShowMgmt *pMgmt = &pMnode->showMgmt;

96
  SShowObj *pShow = taosCacheAcquireByKey(pMgmt->cache, &showId, sizeof(showId));
S
Shengliang Guan 已提交
97
  if (pShow == NULL) {
S
Shengliang Guan 已提交
98
    mError("show:0x%" PRIx64 ", already destroyed", showId);
S
Shengliang Guan 已提交
99 100 101
    return NULL;
  }

S
Shengliang Guan 已提交
102
  mTrace("show:0x%" PRIx64 ", acquired from cache, data:%p", pShow->id, pShow);
S
Shengliang Guan 已提交
103 104 105 106 107
  return pShow;
}

static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
  if (pShow == NULL) return;
S
Shengliang Guan 已提交
108
  mTrace("show:0x%" PRIx64 ", released from cache, data:%p force:%d", pShow->id, pShow, forceRemove);
S
Shengliang Guan 已提交
109

S
Shengliang Guan 已提交
110 111
  // A bug in tcache.c
  forceRemove = 0;
S
Shengliang Guan 已提交
112 113 114 115

  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
S
Shengliang Guan 已提交
116
}
S
Shengliang Guan 已提交
117

S
Shengliang Guan 已提交
118 119
static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
  SMnode    *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
120
  SShowMgmt *pMgmt = &pMnode->showMgmt;
S
Shengliang Guan 已提交
121 122
  int32_t    code = -1;
  SShowReq   showReq = {0};
S
Shengliang Guan 已提交
123
  SShowRsp   showRsp = {0};
S
Shengliang Guan 已提交
124

S
Shengliang Guan 已提交
125 126 127 128 129 130
  if (tDeserializeSShowReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &showReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    goto SHOW_OVER;
  }

  if (showReq.type <= TSDB_MGMT_TABLE_START || showReq.type >= TSDB_MGMT_TABLE_MAX) {
S
Shengliang Guan 已提交
131
    terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
S
Shengliang Guan 已提交
132
    goto SHOW_OVER;
S
Shengliang Guan 已提交
133 134
  }

S
Shengliang Guan 已提交
135
  ShowMetaFp metaFp = pMgmt->metaFps[showReq.type];
S
Shengliang Guan 已提交
136 137
  if (metaFp == NULL) {
    terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
S
Shengliang Guan 已提交
138
    goto SHOW_OVER;
S
Shengliang Guan 已提交
139 140
  }

S
Shengliang Guan 已提交
141
  SShowObj *pShow = mndCreateShowObj(pMnode, &showReq);
S
Shengliang Guan 已提交
142
  if (pShow == NULL) {
S
Shengliang Guan 已提交
143
    goto SHOW_OVER;
S
Shengliang Guan 已提交
144 145
  }

S
Shengliang Guan 已提交
146 147 148
  showRsp.showId = pShow->id;
  showRsp.tableMeta.pSchemas = calloc(TSDB_MAX_COLUMNS, sizeof(SSchema));
  if (showRsp.tableMeta.pSchemas == NULL) {
S
Shengliang Guan 已提交
149 150
    mndReleaseShowObj(pShow, true);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
151
    goto SHOW_OVER;
S
Shengliang Guan 已提交
152 153
  }

S
Shengliang Guan 已提交
154
  code = (*metaFp)(pReq, pShow, &showRsp.tableMeta);
S
Shengliang Guan 已提交
155 156
  mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d showReq.type:%s, result:%s", pShow->id,
         pShow->numOfRows, pShow->numOfColumns, mndShowStr(showReq.type), tstrerror(code));
S
Shengliang Guan 已提交
157

S
Shengliang Guan 已提交
158 159 160 161 162 163
  if (code == 0) {
    int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
    void   *pBuf = rpcMallocCont(bufLen);
    tSerializeSShowRsp(pBuf, bufLen, &showRsp);
    pReq->contLen = bufLen;
    pReq->pCont = pBuf;
S
Shengliang Guan 已提交
164 165 166 167
    mndReleaseShowObj(pShow, false);
  } else {
    mndReleaseShowObj(pShow, true);
  }
S
Shengliang Guan 已提交
168 169 170 171 172 173 174

SHOW_OVER:
  if (code != 0) {
    mError("failed to process show-meta req since %s", terrstr());
  }

  tFreeSShowReq(&showReq);
S
Shengliang Guan 已提交
175
  tFreeSShowRsp(&showRsp);
S
Shengliang Guan 已提交
176
  return code;
S
Shengliang Guan 已提交
177 178
}

S
Shengliang Guan 已提交
179 180
static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) {
  SMnode    *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
181 182 183 184 185
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  int32_t    rowsToRead = 0;
  int32_t    size = 0;
  int32_t    rowsRead = 0;

S
Shengliang Guan 已提交
186 187 188 189 190
  SRetrieveTableReq retrieveReq = {0};
  if (tDeserializeSRetrieveTableReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &retrieveReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
191

S
Shengliang Guan 已提交
192
  SShowObj *pShow = mndAcquireShowObj(pMnode, retrieveReq.showId);
S
Shengliang Guan 已提交
193
  if (pShow == NULL) {
S
Shengliang Guan 已提交
194
    terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
S
Shengliang Guan 已提交
195
    mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
S
Shengliang Guan 已提交
196 197 198 199 200 201 202
    return -1;
  }

  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
  if (retrieveFp == NULL) {
    mndReleaseShowObj(pShow, false);
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
203
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
S
Shengliang Guan 已提交
204 205 206
    return -1;
  }

S
Shengliang Guan 已提交
207
  mDebug("show:0x%" PRIx64 ", start retrieve data, numOfReads:%d numOfRows:%d type:%s", pShow->id, pShow->numOfReads,
S
Shengliang Guan 已提交
208
         pShow->numOfRows, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
209 210

  if (mndCheckRetrieveFinished(pShow)) {
S
Shengliang Guan 已提交
211 212
    mDebug("show:0x%" PRIx64 ", read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads,
           pShow->numOfRows);
S
Shengliang Guan 已提交
213 214 215
    pShow->numOfReads = pShow->numOfRows;
  }

S
Shengliang Guan 已提交
216
  if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
S
Shengliang Guan 已提交
217 218 219 220
    rowsToRead = pShow->numOfRows - pShow->numOfReads;
  }

  /* return no more than 100 tables in one round trip */
S
Shengliang Guan 已提交
221
  if (rowsToRead > SHOW_STEP_SIZE) rowsToRead = SHOW_STEP_SIZE;
S
Shengliang Guan 已提交
222 223 224 225 226 227 228 229

  /*
   * the actual number of table may be larger than the value of pShow->numOfRows, if a query is
   * issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
   */
  if (rowsToRead < 0) rowsToRead = 0;
  size = pShow->rowSize * rowsToRead;

S
Shengliang Guan 已提交
230
  size += SHOW_STEP_SIZE;
S
Shengliang Guan 已提交
231 232 233 234
  SRetrieveTableRsp *pRsp = rpcMallocCont(size);
  if (pRsp == NULL) {
    mndReleaseShowObj(pShow, false);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
235
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
S
Shengliang Guan 已提交
236 237 238 239
    return -1;
  }

  // if free flag is set, client wants to clean the resources
S
Shengliang Guan 已提交
240
  if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
S
Shengliang Guan 已提交
241
    rowsRead = (*retrieveFp)(pReq, pShow, pRsp->data, rowsToRead);
S
Shengliang Guan 已提交
242 243
  }

S
Shengliang Guan 已提交
244
  mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead);
S
Shengliang Guan 已提交
245 246

  pRsp->numOfRows = htonl(rowsRead);
S
Shengliang Guan 已提交
247
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
S
Shengliang Guan 已提交
248

S
Shengliang Guan 已提交
249 250
  pReq->pCont = pRsp;
  pReq->contLen = size;
S
Shengliang Guan 已提交
251

S
Shengliang Guan 已提交
252
  if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
S
Shengliang Guan 已提交
253
    pRsp->completed = 1;
S
Shengliang Guan 已提交
254
    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
S
Shengliang Guan 已提交
255 256
    mndReleaseShowObj(pShow, true);
  } else {
S
Shengliang Guan 已提交
257
    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
S
Shengliang Guan 已提交
258 259 260 261 262 263
    mndReleaseShowObj(pShow, false);
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
264
char *mndShowStr(int32_t showType) {
S
Shengliang Guan 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277
  switch (showType) {
    case TSDB_MGMT_TABLE_ACCT:
      return "show accounts";
    case TSDB_MGMT_TABLE_USER:
      return "show users";
    case TSDB_MGMT_TABLE_DB:
      return "show databases";
    case TSDB_MGMT_TABLE_TABLE:
      return "show tables";
    case TSDB_MGMT_TABLE_DNODE:
      return "show dnodes";
    case TSDB_MGMT_TABLE_MNODE:
      return "show mnodes";
S
Shengliang Guan 已提交
278 279 280 281 282 283
    case TSDB_MGMT_TABLE_QNODE:
      return "show qnodes";
    case TSDB_MGMT_TABLE_SNODE:
      return "show snodes";
    case TSDB_MGMT_TABLE_BNODE:
      return "show bnodes";
S
Shengliang Guan 已提交
284 285
    case TSDB_MGMT_TABLE_VGROUP:
      return "show vgroups";
S
Shengliang Guan 已提交
286
    case TSDB_MGMT_TABLE_STB:
S
Shengliang Guan 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
      return "show stables";
    case TSDB_MGMT_TABLE_MODULE:
      return "show modules";
    case TSDB_MGMT_TABLE_QUERIES:
      return "show queries";
    case TSDB_MGMT_TABLE_STREAMS:
      return "show streams";
    case TSDB_MGMT_TABLE_VARIABLES:
      return "show configs";
    case TSDB_MGMT_TABLE_CONNS:
      return "show connections";
    case TSDB_MGMT_TABLE_SCORES:
      return "show scores";
    case TSDB_MGMT_TABLE_GRANTS:
      return "show grants";
    case TSDB_MGMT_TABLE_VNODES:
      return "show vnodes";
    case TSDB_MGMT_TABLE_CLUSTER:
S
Shengliang Guan 已提交
305
      return "show cluster";
S
Shengliang Guan 已提交
306 307 308 309
    case TSDB_MGMT_TABLE_STREAMTABLES:
      return "show streamtables";
    case TSDB_MGMT_TABLE_TP:
      return "show topics";
S
Shengliang 已提交
310
    case TSDB_MGMT_TABLE_FUNC:
S
Shengliang Guan 已提交
311
      return "show functions";
S
Shengliang Guan 已提交
312 313 314 315 316 317 318 319
    default:
      return "undefined";
  }
}

static bool mndCheckRetrieveFinished(SShowObj *pShow) {
  if (pShow->pIter == NULL && pShow->numOfReads != 0) {
    return true;
S
Shengliang Guan 已提交
320
  }
S
Shengliang Guan 已提交
321 322 323
  return false;
}

S
Shengliang Guan 已提交
324
void mndVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
S
Shengliang Guan 已提交
325 326 327 328 329 330 331
  if (rows < capacity) {
    for (int32_t i = 0; i < numOfCols; ++i) {
      memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
    }
  }
}

S
Shengliang Guan 已提交
332
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) {
S
Shengliang Guan 已提交
333 334 335 336
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->metaFps[showType] = fp;
}

S
Shengliang Guan 已提交
337
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
S
Shengliang Guan 已提交
338 339 340 341
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->retrieveFps[showType] = fp;
}

S
Shengliang Guan 已提交
342
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
S
Shengliang Guan 已提交
343 344 345
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->freeIterFps[showType] = fp;
}