mndShow.c 10.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndShow.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20
#define SHOW_STEP_SIZE 100

S
Shengliang Guan 已提交
21
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq);
S
Shengliang Guan 已提交
22
static void      mndFreeShowObj(SShowObj *pShow);
23
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
S
Shengliang Guan 已提交
24
static void      mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
S
Shengliang Guan 已提交
25
static bool      mndCheckRetrieveFinished(SShowObj *pShow);
S
Shengliang Guan 已提交
26
static int32_t   mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
S
Shengliang Guan 已提交
27 28 29 30

int32_t mndInitShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;

S
Shengliang Guan 已提交
31
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, (__cache_free_fn_t)mndFreeShowObj, "show");
S
Shengliang Guan 已提交
32 33 34 35 36 37
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc show cache since %s", terrstr());
    return -1;
  }

38
  mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
S
Shengliang Guan 已提交
39 40 41
  return 0;
}

S
Shengliang Guan 已提交
42 43 44 45 46 47 48 49
void mndCleanupShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

S
Shengliang Guan 已提交
50
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq) {
S
Shengliang Guan 已提交
51 52
  SShowMgmt *pMgmt = &pMnode->showMgmt;

53 54
  int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1);
  if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1);
S
Shengliang Guan 已提交
55

S
Shengliang Guan 已提交
56
  int32_t  size = sizeof(SShowObj) + pReq->payloadLen;
S
Shengliang Guan 已提交
57 58 59
  SShowObj showObj = {0};
  showObj.id = showId;
  showObj.pMnode = pMnode;
S
Shengliang Guan 已提交
60 61 62
  showObj.type = pReq->type;
  showObj.payloadLen = pReq->payloadLen;
  memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
63

S
Shengliang Guan 已提交
64
  int32_t   keepTime = tsShellActivityTimer * 6 * 1000;
65
  SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
S
Shengliang Guan 已提交
66
  if (pShow == NULL) {
S
Shengliang Guan 已提交
67
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
68
    mError("show:0x%" PRIx64 ", failed to put into cache since %s", showId, terrstr());
S
Shengliang Guan 已提交
69 70
    return NULL;
  }
S
Shengliang Guan 已提交
71

S
Shengliang Guan 已提交
72
  mTrace("show:0x%" PRIx64 ", is created, data:%p", showId, pShow);
S
Shengliang Guan 已提交
73
  return pShow;
S
Shengliang Guan 已提交
74 75
}

S
Shengliang Guan 已提交
76
static void mndFreeShowObj(SShowObj *pShow) {
S
Shengliang Guan 已提交
77 78 79 80 81 82 83 84 85 86
  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;

  ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
  if (freeFp != NULL) {
    if (pShow->pIter != NULL) {
      (*freeFp)(pMnode, pShow->pIter);
    }
  }

S
Shengliang Guan 已提交
87
  mTrace("show:0x%" PRIx64 ", is destroyed, data:%p", pShow->id, pShow);
S
Shengliang Guan 已提交
88 89
}

90
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId) {
S
Shengliang Guan 已提交
91 92
  SShowMgmt *pMgmt = &pMnode->showMgmt;

93
  SShowObj *pShow = taosCacheAcquireByKey(pMgmt->cache, &showId, sizeof(showId));
S
Shengliang Guan 已提交
94
  if (pShow == NULL) {
S
Shengliang Guan 已提交
95
    mError("show:0x%" PRIx64 ", already destroyed", showId);
S
Shengliang Guan 已提交
96 97 98
    return NULL;
  }

S
Shengliang Guan 已提交
99
  mTrace("show:0x%" PRIx64 ", acquired from cache, data:%p", pShow->id, pShow);
S
Shengliang Guan 已提交
100 101 102 103 104
  return pShow;
}

static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
  if (pShow == NULL) return;
S
Shengliang Guan 已提交
105
  mTrace("show:0x%" PRIx64 ", released from cache, data:%p force:%d", pShow->id, pShow, forceRemove);
S
Shengliang Guan 已提交
106

S
Shengliang Guan 已提交
107 108
  // A bug in tcache.c
  forceRemove = 0;
S
Shengliang Guan 已提交
109 110 111 112

  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
S
Shengliang Guan 已提交
113
}
S
Shengliang Guan 已提交
114

S
Shengliang Guan 已提交
115 116
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
  SMnode    *pMnode = pReq->pNode;
117
  SShowMgmt *pMgmt = &pMnode->showMgmt;
118
  int32_t    rowsToRead = SHOW_STEP_SIZE;
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  int32_t    size = 0;
  int32_t    rowsRead = 0;

  SRetrieveTableReq retrieveReq = {0};
  if (tDeserializeSRetrieveTableReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &retrieveReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  SShowObj* pShow = NULL;

  if (retrieveReq.showId == 0) {
    SShowReq req = {0};
    req.type = retrieveReq.type;
    strncpy(req.db, retrieveReq.db, tListLen(req.db));

    pShow = mndCreateShowObj(pMnode, &req);
    if (pShow == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("failed to process show-meta req since %s", terrstr());
      return -1;
    }
H
Haojun Liao 已提交
141

142 143
    pShow->pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb));
    pShow->numOfColumns = pShow->pMeta->numOfColumns;
H
Haojun Liao 已提交
144
    int32_t offset = 0;
145 146

    for(int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
H
Haojun Liao 已提交
147 148
      pShow->offset[i] = offset;

149
      int32_t bytes = pShow->pMeta->pSchemas[i].bytes;
H
Haojun Liao 已提交
150 151 152 153
      pShow->rowSize += bytes;
      pShow->bytes[i] = bytes;
      offset += bytes;
    }
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
  } else {
    pShow = mndAcquireShowObj(pMnode, retrieveReq.showId);
    if (pShow == NULL) {
      terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
      mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
      return -1;
    }
  }

  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
  if (retrieveFp == NULL) {
    mndReleaseShowObj((SShowObj*) pShow, false);
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
    return -1;
  }

171
  mDebug("show:0x%" PRIx64 ", start retrieve data, type:%s", pShow->id, mndShowStr(pShow->type));
172

173
  int32_t numOfCols = pShow->pMeta->numOfColumns;
174

175 176 177
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
  pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
  pBlock->info.numOfCols = numOfCols;
178

179 180 181 182 183 184 185 186 187 188 189 190 191
  for(int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData idata = {0};
    SSchema* p = &pShow->pMeta->pSchemas[i];

    idata.info.bytes = p->bytes;
    idata.info.type  = p->type;
    idata.info.colId = p->colId;

    taosArrayPush(pBlock->pDataBlock, &idata);
    if (IS_VAR_DATA_TYPE(p->type)) {
      pBlock->info.hasVarCol = true;
    }
  }
H
Haojun Liao 已提交
192

193
  blockDataEnsureCapacity(pBlock, rowsToRead);
194 195 196 197
  if (mndCheckRetrieveFinished((SShowObj*) pShow)) {
    mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
    rowsRead = 0;
  } else {
198
    rowsRead = (*retrieveFp)(pReq, (SShowObj *)pShow, pBlock, rowsToRead);
D
dapan1121 已提交
199 200 201
    if (rowsRead < 0) {
      terrno = rowsRead;
      mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
202
      mndReleaseShowObj((SShowObj *)pShow, true);
D
dapan1121 已提交
203 204
      return -1;
    }
205

206
    pBlock->info.rows = rowsRead;
207 208
    mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
  }
209

210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
  // numOfCols + sizeof(SSysTableSchema) * numOfCols + data payload
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns + blockDataGetSize(pBlock)
      + blockDataGetSerialMetaSize(pBlock);

  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
  if (pRsp == NULL) {
    mndReleaseShowObj((SShowObj*) pShow, false);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
    blockDataDestroy(pBlock);
    return -1;
  }

  pRsp->handle = htobe64(pShow->id);

  // if free flag is set, client wants to clean the resources
  if (rowsRead > 0) {
    char *   pStart = pRsp->data;
    SSchema *ps = pShow->pMeta->pSchemas;

    *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
    pStart += sizeof(int32_t);  // number of columns

    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
      SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
      pSchema->bytes = htonl(ps[i].bytes);
      pSchema->colId = htons(ps[i].colId);
      pSchema->type = ps[i].type;

      pStart += sizeof(SSysTableSchema);
    }

    int32_t len = 0;
    blockCompressEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false);
  }

246 247
  pRsp->numOfRows = htonl(rowsRead);
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
248 249
  pReq->pRsp      = pRsp;
  pReq->rspLen    = size;
250

251
  if (rowsRead == 0 || rowsRead < rowsToRead) {
252 253 254 255 256 257 258 259
    pRsp->completed = 1;
    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
    mndReleaseShowObj((SShowObj*) pShow, true);
  } else {
    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
    mndReleaseShowObj((SShowObj*) pShow, false);
  }

260
  blockDataDestroy(pBlock);
261 262 263
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
264
char *mndShowStr(int32_t showType) {
S
Shengliang Guan 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277
  switch (showType) {
    case TSDB_MGMT_TABLE_ACCT:
      return "show accounts";
    case TSDB_MGMT_TABLE_USER:
      return "show users";
    case TSDB_MGMT_TABLE_DB:
      return "show databases";
    case TSDB_MGMT_TABLE_TABLE:
      return "show tables";
    case TSDB_MGMT_TABLE_DNODE:
      return "show dnodes";
    case TSDB_MGMT_TABLE_MNODE:
      return "show mnodes";
S
Shengliang Guan 已提交
278 279 280 281 282 283
    case TSDB_MGMT_TABLE_QNODE:
      return "show qnodes";
    case TSDB_MGMT_TABLE_SNODE:
      return "show snodes";
    case TSDB_MGMT_TABLE_BNODE:
      return "show bnodes";
S
Shengliang Guan 已提交
284 285
    case TSDB_MGMT_TABLE_VGROUP:
      return "show vgroups";
S
Shengliang Guan 已提交
286
    case TSDB_MGMT_TABLE_STB:
S
Shengliang Guan 已提交
287 288 289 290 291 292 293 294 295 296 297
      return "show stables";
    case TSDB_MGMT_TABLE_MODULE:
      return "show modules";
    case TSDB_MGMT_TABLE_QUERIES:
      return "show queries";
    case TSDB_MGMT_TABLE_STREAMS:
      return "show streams";
    case TSDB_MGMT_TABLE_VARIABLES:
      return "show configs";
    case TSDB_MGMT_TABLE_CONNS:
      return "show connections";
S
Shengliang Guan 已提交
298 299
    case TSDB_MGMT_TABLE_TRANS:
      return "show trans";
S
Shengliang Guan 已提交
300 301 302 303 304
    case TSDB_MGMT_TABLE_GRANTS:
      return "show grants";
    case TSDB_MGMT_TABLE_VNODES:
      return "show vnodes";
    case TSDB_MGMT_TABLE_CLUSTER:
S
Shengliang Guan 已提交
305
      return "show cluster";
S
Shengliang Guan 已提交
306 307 308 309
    case TSDB_MGMT_TABLE_STREAMTABLES:
      return "show streamtables";
    case TSDB_MGMT_TABLE_TP:
      return "show topics";
S
Shengliang 已提交
310
    case TSDB_MGMT_TABLE_FUNC:
S
Shengliang Guan 已提交
311
      return "show functions";
S
sma  
Shengliang Guan 已提交
312 313
      case TSDB_MGMT_TABLE_INDEX:
      return "show indexes";
S
Shengliang Guan 已提交
314 315 316 317 318 319
    default:
      return "undefined";
  }
}

static bool mndCheckRetrieveFinished(SShowObj *pShow) {
320
  if (pShow->pIter == NULL && pShow->numOfRows != 0) {
S
Shengliang Guan 已提交
321
    return true;
S
Shengliang Guan 已提交
322
  }
S
Shengliang Guan 已提交
323 324 325
  return false;
}

S
Shengliang Guan 已提交
326
void mndVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
S
Shengliang Guan 已提交
327 328 329 330 331 332 333
  if (rows < capacity) {
    for (int32_t i = 0; i < numOfCols; ++i) {
      memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
    }
  }
}

S
Shengliang Guan 已提交
334
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
S
Shengliang Guan 已提交
335 336 337 338
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->retrieveFps[showType] = fp;
}

S
Shengliang Guan 已提交
339
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
S
Shengliang Guan 已提交
340 341 342
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->freeIterFps[showType] = fp;
}