mndShow.c 11.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndShow.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20
#define SHOW_STEP_SIZE 100

S
Shengliang Guan 已提交
21 22 23 24
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg);
static void      mndFreeShowObj(SShowObj *pShow);
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId);
static void      mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
S
Shengliang Guan 已提交
25 26 27
static int32_t   mndProcessShowMsg(SMnodeMsg *pMnodeMsg);
static int32_t   mndProcessRetrieveMsg(SMnodeMsg *pMsg);
static bool      mndCheckRetrieveFinished(SShowObj *pShow);
S
Shengliang Guan 已提交
28 29 30 31

int32_t mndInitShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;

S
Shengliang Guan 已提交
32
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, (__cache_free_fn_t)mndFreeShowObj, "show");
S
Shengliang Guan 已提交
33 34 35 36 37 38 39 40 41 42 43
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc show cache since %s", terrstr());
    return -1;
  }

  mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_SHOW, mndProcessShowMsg);
  mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_SHOW_RETRIEVE, mndProcessRetrieveMsg);
  return 0;
}

S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51
void mndCleanupShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
Shengliang Guan 已提交
52
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg) {
S
Shengliang Guan 已提交
53 54
  SShowMgmt *pMgmt = &pMnode->showMgmt;

S
Shengliang Guan 已提交
55 56
  int32_t showId = atomic_add_fetch_32(&pMgmt->showId, 1);
  if (showId == 0) atomic_add_fetch_32(&pMgmt->showId, 1);
S
Shengliang Guan 已提交
57

S
Shengliang Guan 已提交
58 59 60 61 62 63 64 65 66 67
  int32_t   size = sizeof(SShowObj) + pMsg->payloadLen;
  SShowObj *pShow = calloc(1, size);
  if (pShow != NULL) {
    pShow->id = showId;
    pShow->pMnode = pMnode;
    pShow->type = pMsg->type;
    pShow->payloadLen = pMsg->payloadLen;
    memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN);
    memcpy(pShow->payload, pMsg->payload, pMsg->payloadLen);
  } else {
S
Shengliang Guan 已提交
68
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
69 70
    mError("failed to process show-meta msg:%s since %s", mndShowStr(pMsg->type), terrstr());
    return NULL;
S
Shengliang Guan 已提交
71 72
  }

S
Shengliang Guan 已提交
73
  int32_t   keepTime = pMnode->cfg.shellActivityTimer * 6 * 1000;
S
Shengliang Guan 已提交
74 75 76 77 78 79 80 81 82 83
  SShowObj *pShowRet = taosCachePut(pMgmt->cache, &showId, sizeof(int32_t), pShow, size, keepTime);
  free(pShow);
  if (pShowRet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("show:%d, failed to put into cache since %s", showId, terrstr());
    return NULL;
  } else {
    mTrace("show:%d, data:%p created", showId, pShowRet);
    return pShowRet;
  }
S
Shengliang Guan 已提交
84 85
}

S
Shengliang Guan 已提交
86
static void mndFreeShowObj(SShowObj *pShow) {
S
Shengliang Guan 已提交
87 88 89 90 91 92 93 94 95 96
  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;

  ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
  if (freeFp != NULL) {
    if (pShow->pIter != NULL) {
      (*freeFp)(pMnode, pShow->pIter);
    }
  }

S
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
  mTrace("show:%d, data:%p destroyed", pShow->id, pShow);
}

static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;

  SShowObj *pShow = taosCacheAcquireByKey(pMgmt->cache, &showId, sizeof(int32_t));
  if (pShow == NULL) {
    mError("show:%d, already destroyed", showId);
    return NULL;
  }

  mTrace("show:%d, data:%p acquired from cache", pShow->id, pShow);
  return pShow;
}

static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
  if (pShow == NULL) return;
  mTrace("show:%d, data:%p released from cache, force:%d", pShow->id, pShow, forceRemove);
S
Shengliang Guan 已提交
116 117 118
  
  // A bug in tcache.c
  forceRemove = 0;
S
Shengliang Guan 已提交
119 120 121 122

  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
S
Shengliang Guan 已提交
123
}
S
Shengliang Guan 已提交
124

125 126
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
  SMnode    *pMnode = pMnodeMsg->pMnode;
S
Shengliang Guan 已提交
127 128 129
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  SShowMsg  *pMsg = pMnodeMsg->rpcMsg.pCont;
  int8_t     type = pMsg->type;
S
Shengliang Guan 已提交
130
  int16_t    payloadLen = htonl(pMsg->payloadLen);
S
Shengliang Guan 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144

  if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) {
    terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
    mError("failed to process show msg since %s", terrstr());
    return -1;
  }

  ShowMetaFp metaFp = pMgmt->metaFps[type];
  if (metaFp == NULL) {
    terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
    mError("failed to process show-meta msg:%s since no message handle", mndShowStr(type));
    return -1;
  }

S
Shengliang Guan 已提交
145 146
  SShowObj *pShow = mndCreateShowObj(pMnode, pMsg);
  if (pShow == NULL) {
S
Shengliang Guan 已提交
147 148 149 150
    mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
151
  int32_t   size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
S
Shengliang Guan 已提交
152 153 154 155 156 157 158 159
  SShowRsp *pRsp = rpcMallocCont(size);
  if (pRsp == NULL) {
    mndReleaseShowObj(pShow, true);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("show:%d, failed to process show-meta msg:%s since malloc rsp error", pShow->id, mndShowStr(type));
    return -1;
  }

S
Shengliang Guan 已提交
160 161 162
  int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta);
  mDebug("show:%d, data:%p get meta finished, numOfRows:%d cols:%d type:%s result:%s", pShow->id, pShow,
         pShow->numOfRows, pShow->numOfColumns, mndShowStr(type), tstrerror(code));
S
Shengliang Guan 已提交
163 164 165 166

  if (code == TSDB_CODE_SUCCESS) {
    pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
    pMnodeMsg->pCont = pRsp;
S
Shengliang Guan 已提交
167
    pRsp->showId = htonl(pShow->id);
S
Shengliang Guan 已提交
168 169 170 171 172 173 174 175 176
    mndReleaseShowObj(pShow, false);
    return TSDB_CODE_SUCCESS;
  } else {
    rpcFreeCont(pRsp);
    mndReleaseShowObj(pShow, true);
    return code;
  }
}

177 178
static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
  SMnode    *pMnode = pMnodeMsg->pMnode;
S
Shengliang Guan 已提交
179 180 181 182 183 184
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  int32_t    rowsToRead = 0;
  int32_t    size = 0;
  int32_t    rowsRead = 0;

  SRetrieveTableMsg *pRetrieve = pMnodeMsg->rpcMsg.pCont;
S
Shengliang Guan 已提交
185
  int32_t            showId = htonl(pRetrieve->showId);
S
Shengliang Guan 已提交
186

S
Shengliang Guan 已提交
187 188
  SShowObj *pShow = mndAcquireShowObj(pMnode, showId);
  if (pShow == NULL) {
S
Shengliang Guan 已提交
189 190 191 192 193 194 195 196 197
    terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
    mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr());
    return -1;
  }

  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
  if (retrieveFp == NULL) {
    mndReleaseShowObj(pShow, false);
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
198
    mError("show:%d, data:%p failed to retrieve data since %s", pShow->id, pShow, terrstr());
S
Shengliang Guan 已提交
199 200 201
    return -1;
  }

S
Shengliang Guan 已提交
202 203
  mDebug("show:%d, data:%p start retrieve data, numOfReads:%d numOfRows:%d type:%s", pShow->id, pShow,
         pShow->numOfReads, pShow->numOfRows, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
204 205

  if (mndCheckRetrieveFinished(pShow)) {
S
Shengliang Guan 已提交
206 207
    mDebug("show:%d, data:%p read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow, pShow->numOfReads,
           pShow->numOfRows);
S
Shengliang Guan 已提交
208 209 210 211 212 213 214 215
    pShow->numOfReads = pShow->numOfRows;
  }

  if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
    rowsToRead = pShow->numOfRows - pShow->numOfReads;
  }

  /* return no more than 100 tables in one round trip */
S
Shengliang Guan 已提交
216
  if (rowsToRead > SHOW_STEP_SIZE) rowsToRead = SHOW_STEP_SIZE;
S
Shengliang Guan 已提交
217 218 219 220 221 222 223 224

  /*
   * the actual number of table may be larger than the value of pShow->numOfRows, if a query is
   * issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
   */
  if (rowsToRead < 0) rowsToRead = 0;
  size = pShow->rowSize * rowsToRead;

S
Shengliang Guan 已提交
225
  size += SHOW_STEP_SIZE;
S
Shengliang Guan 已提交
226 227 228 229
  SRetrieveTableRsp *pRsp = rpcMallocCont(size);
  if (pRsp == NULL) {
    mndReleaseShowObj(pShow, false);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
230
    mError("show:%d, data:%p failed to retrieve data since %s", pShow->id, pShow, terrstr());
S
Shengliang Guan 已提交
231 232 233 234 235
    return -1;
  }

  // if free flag is set, client wants to clean the resources
  if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
S
Shengliang Guan 已提交
236
    rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead);
S
Shengliang Guan 已提交
237 238
  }

S
Shengliang Guan 已提交
239
  mDebug("show:%d, data:%p stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, pShow, rowsRead, rowsToRead);
S
Shengliang Guan 已提交
240 241

  pRsp->numOfRows = htonl(rowsRead);
S
Shengliang Guan 已提交
242
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
S
Shengliang Guan 已提交
243 244 245 246

  pMnodeMsg->pCont = pRsp;
  pMnodeMsg->contLen = size;

S
Shengliang Guan 已提交
247
  if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
S
Shengliang Guan 已提交
248
    pRsp->completed = 1;
S
Shengliang Guan 已提交
249
    mDebug("show:%d, data:%p retrieve completed", pShow->id, pShow);
S
Shengliang Guan 已提交
250 251
    mndReleaseShowObj(pShow, true);
  } else {
S
Shengliang Guan 已提交
252
    mDebug("show:%d, data:%p retrieve not completed yet", pShow->id, pShow);
S
Shengliang Guan 已提交
253 254 255 256 257 258
    mndReleaseShowObj(pShow, false);
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
259
char *mndShowStr(int32_t showType) {
S
Shengliang Guan 已提交
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
  switch (showType) {
    case TSDB_MGMT_TABLE_ACCT:
      return "show accounts";
    case TSDB_MGMT_TABLE_USER:
      return "show users";
    case TSDB_MGMT_TABLE_DB:
      return "show databases";
    case TSDB_MGMT_TABLE_TABLE:
      return "show tables";
    case TSDB_MGMT_TABLE_DNODE:
      return "show dnodes";
    case TSDB_MGMT_TABLE_MNODE:
      return "show mnodes";
    case TSDB_MGMT_TABLE_VGROUP:
      return "show vgroups";
S
Shengliang Guan 已提交
275
    case TSDB_MGMT_TABLE_STB:
S
Shengliang Guan 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
      return "show stables";
    case TSDB_MGMT_TABLE_MODULE:
      return "show modules";
    case TSDB_MGMT_TABLE_QUERIES:
      return "show queries";
    case TSDB_MGMT_TABLE_STREAMS:
      return "show streams";
    case TSDB_MGMT_TABLE_VARIABLES:
      return "show configs";
    case TSDB_MGMT_TABLE_CONNS:
      return "show connections";
    case TSDB_MGMT_TABLE_SCORES:
      return "show scores";
    case TSDB_MGMT_TABLE_GRANTS:
      return "show grants";
    case TSDB_MGMT_TABLE_VNODES:
      return "show vnodes";
    case TSDB_MGMT_TABLE_CLUSTER:
      return "show clusters";
    case TSDB_MGMT_TABLE_STREAMTABLES:
      return "show streamtables";
    case TSDB_MGMT_TABLE_TP:
      return "show topics";
S
Shengliang Guan 已提交
299 300
    case TSDB_MGMT_TABLE_FUNCTION:
      return "show functions";
S
Shengliang Guan 已提交
301 302 303 304 305 306 307 308
    default:
      return "undefined";
  }
}

static bool mndCheckRetrieveFinished(SShowObj *pShow) {
  if (pShow->pIter == NULL && pShow->numOfReads != 0) {
    return true;
S
Shengliang Guan 已提交
309
  }
S
Shengliang Guan 已提交
310 311 312
  return false;
}

S
Shengliang Guan 已提交
313
void mndVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
S
Shengliang Guan 已提交
314 315 316 317 318 319 320
  if (rows < capacity) {
    for (int32_t i = 0; i < numOfCols; ++i) {
      memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
    }
  }
}

S
Shengliang Guan 已提交
321
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) {
S
Shengliang Guan 已提交
322 323 324 325
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->metaFps[showType] = fp;
}

S
Shengliang Guan 已提交
326
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
S
Shengliang Guan 已提交
327 328 329 330
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->retrieveFps[showType] = fp;
}

S
Shengliang Guan 已提交
331
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
S
Shengliang Guan 已提交
332 333 334
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->freeIterFps[showType] = fp;
}