mndShow.c 11.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndShow.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20
#define SHOW_STEP_SIZE 100

21
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq);
S
Shengliang Guan 已提交
22
static void      mndFreeShowObj(SShowObj *pShow);
23
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
S
Shengliang Guan 已提交
24
static void      mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
S
Shengliang Guan 已提交
25
static bool      mndCheckRetrieveFinished(SShowObj *pShow);
S
Shengliang Guan 已提交
26
static int32_t   mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
S
Shengliang Guan 已提交
27 28 29 30

int32_t mndInitShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;

S
Shengliang Guan 已提交
31
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, (__cache_free_fn_t)mndFreeShowObj, "show");
S
Shengliang Guan 已提交
32 33 34 35 36 37
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc show cache since %s", terrstr());
    return -1;
  }

38
  mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
S
Shengliang Guan 已提交
39 40 41
  return 0;
}

S
Shengliang Guan 已提交
42 43 44 45 46 47 48 49
void mndCleanupShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
static int32_t convertToRetrieveType(char* name, int32_t len) {
  int32_t type = -1;

  if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_DNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_MNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, len) == 0) {
    type = TSDB_MGMT_TABLE_MODULE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_QNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_BNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_SNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) {
    type = TSDB_MGMT_TABLE_CLUSTER;
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, len) == 0) {
    type = TSDB_MGMT_TABLE_DB;
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) {
    type = TSDB_MGMT_TABLE_FUNC;
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) {
    //    type = TSDB_MGMT_TABLE_INDEX;
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
    type = TSDB_MGMT_TABLE_STB;
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, len) == 0) {
    type = TSDB_MGMT_TABLE_STREAMS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
    type = TSDB_MGMT_TABLE_TABLE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
    //    type = TSDB_MGMT_TABLE_DIST;
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, len) == 0) {
    type = TSDB_MGMT_TABLE_USER;
  } else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, len) == 0) {
    type = TSDB_MGMT_TABLE_GRANTS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, len) == 0) {
    type = TSDB_MGMT_TABLE_VGROUP;
  } else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, len) == 0) {
    type = TSDB_MGMT_TABLE_TOPICS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, len) == 0) {
    type = TSDB_MGMT_TABLE_CONSUMERS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, len) == 0) {
    type = TSDB_MGMT_TABLE_SUBSCRIBES;
  } else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, len) == 0) {
    type = TSDB_MGMT_TABLE_TRANS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, len) == 0) {
    type = TSDB_MGMT_TABLE_SMAS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, len) == 0) {
    type = TSDB_MGMT_TABLE_CONFIGS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, len) == 0) {
    type = TSDB_MGMT_TABLE_CONNS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, len) == 0) {
    type = TSDB_MGMT_TABLE_QUERIES;
  }  else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_VNODES;
  } else {
//    ASSERT(0);
  }

  return type;
}

static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
S
Shengliang Guan 已提交
113 114
  SShowMgmt *pMgmt = &pMnode->showMgmt;

115 116
  int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1);
  if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1);
S
Shengliang Guan 已提交
117

118 119
  int32_t  size = sizeof(SShowObj);

S
Shengliang Guan 已提交
120
  SShowObj showObj = {0};
121
  showObj.id     = showId;
S
Shengliang Guan 已提交
122
  showObj.pMnode = pMnode;
123
  showObj.type   = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
S
Shengliang Guan 已提交
124
  memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
125

S
Shengliang Guan 已提交
126
  int32_t   keepTime = tsShellActivityTimer * 6 * 1000;
127
  SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
S
Shengliang Guan 已提交
128
  if (pShow == NULL) {
S
Shengliang Guan 已提交
129
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
130
    mError("show:0x%" PRIx64 ", failed to put into cache since %s", showId, terrstr());
S
Shengliang Guan 已提交
131 132
    return NULL;
  }
S
Shengliang Guan 已提交
133

S
Shengliang Guan 已提交
134
  mTrace("show:0x%" PRIx64 ", is created, data:%p", showId, pShow);
S
Shengliang Guan 已提交
135
  return pShow;
S
Shengliang Guan 已提交
136 137
}

S
Shengliang Guan 已提交
138
static void mndFreeShowObj(SShowObj *pShow) {
S
Shengliang Guan 已提交
139 140 141 142 143 144 145 146 147 148
  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;

  ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
  if (freeFp != NULL) {
    if (pShow->pIter != NULL) {
      (*freeFp)(pMnode, pShow->pIter);
    }
  }

S
Shengliang Guan 已提交
149
  mTrace("show:0x%" PRIx64 ", is destroyed, data:%p", pShow->id, pShow);
S
Shengliang Guan 已提交
150 151
}

152
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId) {
S
Shengliang Guan 已提交
153 154
  SShowMgmt *pMgmt = &pMnode->showMgmt;

155
  SShowObj *pShow = taosCacheAcquireByKey(pMgmt->cache, &showId, sizeof(showId));
S
Shengliang Guan 已提交
156
  if (pShow == NULL) {
S
Shengliang Guan 已提交
157
    mError("show:0x%" PRIx64 ", already destroyed", showId);
S
Shengliang Guan 已提交
158 159 160
    return NULL;
  }

S
Shengliang Guan 已提交
161
  mTrace("show:0x%" PRIx64 ", acquired from cache, data:%p", pShow->id, pShow);
S
Shengliang Guan 已提交
162 163 164 165 166
  return pShow;
}

static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
  if (pShow == NULL) return;
S
Shengliang Guan 已提交
167
  mTrace("show:0x%" PRIx64 ", released from cache, data:%p force:%d", pShow->id, pShow, forceRemove);
S
Shengliang Guan 已提交
168

S
Shengliang Guan 已提交
169 170
  // A bug in tcache.c
  forceRemove = 0;
S
Shengliang Guan 已提交
171 172 173 174

  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
S
Shengliang Guan 已提交
175
}
S
Shengliang Guan 已提交
176

S
Shengliang Guan 已提交
177 178
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
  SMnode    *pMnode = pReq->pNode;
179
  SShowMgmt *pMgmt = &pMnode->showMgmt;
180
  SShowObj  *pShow = NULL;
181
  int32_t    rowsToRead = SHOW_STEP_SIZE;
182 183 184 185 186 187 188 189 190 191
  int32_t    size = 0;
  int32_t    rowsRead = 0;

  SRetrieveTableReq retrieveReq = {0};
  if (tDeserializeSRetrieveTableReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &retrieveReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  if (retrieveReq.showId == 0) {
S
Shengliang Guan 已提交
192 193 194 195 196 197 198
    STableMetaRsp *pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb) + 1);
    if (pMeta == NULL) {
      terrno = TSDB_CODE_MND_INVALID_INFOS_TBL;
      mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
      return -1;
    }

199
    pShow = mndCreateShowObj(pMnode, &retrieveReq);
200 201 202 203 204
    if (pShow == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("failed to process show-meta req since %s", terrstr());
      return -1;
    }
H
Haojun Liao 已提交
205

S
Shengliang Guan 已提交
206
    pShow->pMeta = pMeta;
207
    pShow->numOfColumns = pShow->pMeta->numOfColumns;
H
Haojun Liao 已提交
208
    int32_t offset = 0;
209

210
    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
H
Haojun Liao 已提交
211 212
      pShow->offset[i] = offset;

213
      int32_t bytes = pShow->pMeta->pSchemas[i].bytes;
H
Haojun Liao 已提交
214 215 216 217
      pShow->rowSize += bytes;
      pShow->bytes[i] = bytes;
      offset += bytes;
    }
218 219 220 221 222 223 224 225 226 227 228
  } else {
    pShow = mndAcquireShowObj(pMnode, retrieveReq.showId);
    if (pShow == NULL) {
      terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
      mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
      return -1;
    }
  }

  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
  if (retrieveFp == NULL) {
229
    mndReleaseShowObj(pShow, false);
230 231 232 233 234
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
    return -1;
  }

235
  mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
236

237 238
  int32_t      numOfCols = pShow->pMeta->numOfColumns;
  SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
239 240
  pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
  pBlock->info.numOfCols = numOfCols;
241

242
  for (int32_t i = 0; i < numOfCols; ++i) {
243
    SColumnInfoData idata = {0};
244
    SSchema        *p = &pShow->pMeta->pSchemas[i];
245 246

    idata.info.bytes = p->bytes;
247
    idata.info.type = p->type;
248 249 250 251 252 253 254
    idata.info.colId = p->colId;

    taosArrayPush(pBlock->pDataBlock, &idata);
    if (IS_VAR_DATA_TYPE(p->type)) {
      pBlock->info.hasVarCol = true;
    }
  }
H
Haojun Liao 已提交
255

256
  blockDataEnsureCapacity(pBlock, rowsToRead);
257
  if (mndCheckRetrieveFinished(pShow)) {
258 259 260
    mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
    rowsRead = 0;
  } else {
261
    rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
D
dapan1121 已提交
262 263 264
    if (rowsRead < 0) {
      terrno = rowsRead;
      mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
265
      mndReleaseShowObj(pShow, true);
D
dapan1121 已提交
266 267
      return -1;
    }
268

269
    pBlock->info.rows = rowsRead;
270 271
    mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
  }
272

273 274
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
         blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(pBlock);
275 276 277

  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
  if (pRsp == NULL) {
278
    mndReleaseShowObj(pShow, false);
279 280 281 282 283 284 285 286 287
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
    blockDataDestroy(pBlock);
    return -1;
  }

  pRsp->handle = htobe64(pShow->id);

  if (rowsRead > 0) {
288
    char    *pStart = pRsp->data;
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
    SSchema *ps = pShow->pMeta->pSchemas;

    *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
    pStart += sizeof(int32_t);  // number of columns

    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
      SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
      pSchema->bytes = htonl(ps[i].bytes);
      pSchema->colId = htons(ps[i].colId);
      pSchema->type = ps[i].type;

      pStart += sizeof(SSysTableSchema);
    }

    int32_t len = 0;
    blockCompressEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false);
  }

307 308
  pRsp->numOfRows = htonl(rowsRead);
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
309 310
  pReq->pRsp = pRsp;
  pReq->rspLen = size;
311

312
  if (rowsRead == 0 || rowsRead < rowsToRead) {
313 314
    pRsp->completed = 1;
    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
315
    mndReleaseShowObj(pShow, true);
316 317
  } else {
    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
318
    mndReleaseShowObj(pShow, false);
319 320
  }

321
  blockDataDestroy(pBlock);
322 323 324
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
325
static bool mndCheckRetrieveFinished(SShowObj *pShow) {
326
  if (pShow->pIter == NULL && pShow->numOfRows != 0) {
S
Shengliang Guan 已提交
327
    return true;
S
Shengliang Guan 已提交
328
  }
S
Shengliang Guan 已提交
329 330 331
  return false;
}

S
Shengliang Guan 已提交
332
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
S
Shengliang Guan 已提交
333 334 335 336
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->retrieveFps[showType] = fp;
}

S
Shengliang Guan 已提交
337
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
S
Shengliang Guan 已提交
338 339 340
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->freeIterFps[showType] = fp;
}