mndShow.c 12.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndShow.h"
18
#include "mndPrivilege.h"
19
#include "systable.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SHOW_STEP_SIZE 100
wmmhello's avatar
wmmhello 已提交
22
#define SHOW_COLS_STEP_SIZE 4096
23
#define SHOW_PRIVILEGES_STEP_SIZE 2048
S
Shengliang Guan 已提交
24

25
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq);
S
Shengliang Guan 已提交
26
static void      mndFreeShowObj(SShowObj *pShow);
27
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
S
Shengliang Guan 已提交
28
static void      mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
S
Shengliang Guan 已提交
29
static bool      mndCheckRetrieveFinished(SShowObj *pShow);
S
Shengliang Guan 已提交
30
static int32_t   mndProcessRetrieveSysTableReq(SRpcMsg *pReq);
S
Shengliang Guan 已提交
31 32 33 34

int32_t mndInitShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;

L
Liu Jicong 已提交
35
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5000, true, (__cache_free_fn_t)mndFreeShowObj, "show");
S
Shengliang Guan 已提交
36 37 38 39 40 41
  if (pMgmt->cache == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to alloc show cache since %s", terrstr());
    return -1;
  }

42
  mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
S
Shengliang Guan 已提交
43 44 45
  return 0;
}

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53
void mndCleanupShow(SMnode *pMnode) {
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  if (pMgmt->cache != NULL) {
    taosCacheCleanup(pMgmt->cache);
    pMgmt->cache = NULL;
  }
}

L
Liu Jicong 已提交
54
static int32_t convertToRetrieveType(char *name, int32_t len) {
55 56 57 58 59 60 61 62 63 64 65 66 67 68
  int32_t type = -1;

  if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_DNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_MNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, len) == 0) {
    type = TSDB_MGMT_TABLE_MODULE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_QNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) {
    type = TSDB_MGMT_TABLE_SNODE;
  } else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) {
    type = TSDB_MGMT_TABLE_CLUSTER;
D
dapan1121 已提交
69
  } else if (strncasecmp(name, TSDB_INS_TABLE_DATABASES, len) == 0) {
70
    type = TSDB_MGMT_TABLE_DB;
D
dapan1121 已提交
71
  } else if (strncasecmp(name, TSDB_INS_TABLE_FUNCTIONS, len) == 0) {
72
    type = TSDB_MGMT_TABLE_FUNC;
D
dapan1121 已提交
73
  } else if (strncasecmp(name, TSDB_INS_TABLE_INDEXES, len) == 0) {
X
Xiaoyu Wang 已提交
74
    type = TSDB_MGMT_TABLE_INDEX;
D
dapan1121 已提交
75
  } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, len) == 0) {
76
    type = TSDB_MGMT_TABLE_STB;
D
dapan1121 已提交
77
  } else if (strncasecmp(name, TSDB_INS_TABLE_TABLES, len) == 0) {
78
    type = TSDB_MGMT_TABLE_TABLE;
D
dapan1121 已提交
79
  } else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, len) == 0) {
80
    type = TSDB_MGMT_TABLE_TAG;
81 82
  } else if (strncasecmp(name, TSDB_INS_TABLE_COLS, len) == 0) {
    type = TSDB_MGMT_TABLE_COL;
D
dapan1121 已提交
83
  } else if (strncasecmp(name, TSDB_INS_TABLE_TABLE_DISTRIBUTED, len) == 0) {
84
    //    type = TSDB_MGMT_TABLE_DIST;
D
dapan1121 已提交
85
  } else if (strncasecmp(name, TSDB_INS_TABLE_USERS, len) == 0) {
86 87 88 89 90
    type = TSDB_MGMT_TABLE_USER;
  } else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, len) == 0) {
    type = TSDB_MGMT_TABLE_GRANTS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, len) == 0) {
    type = TSDB_MGMT_TABLE_VGROUP;
91
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) {
92
    type = TSDB_MGMT_TABLE_CONSUMERS;
93
  } else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIPTIONS, len) == 0) {
L
Liu Jicong 已提交
94
    type = TSDB_MGMT_TABLE_SUBSCRIPTIONS;
95
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) {
96
    type = TSDB_MGMT_TABLE_TRANS;
97
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) {
98 99 100
    type = TSDB_MGMT_TABLE_SMAS;
  } else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, len) == 0) {
    type = TSDB_MGMT_TABLE_CONFIGS;
101
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_CONNECTIONS, len) == 0) {
102
    type = TSDB_MGMT_TABLE_CONNS;
103
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_QUERIES, len) == 0) {
104
    type = TSDB_MGMT_TABLE_QUERIES;
L
Liu Jicong 已提交
105
  } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) {
106
    type = TSDB_MGMT_TABLE_VNODES;
107
  } else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, len) == 0) {
108
    type = TSDB_MGMT_TABLE_TOPICS;
109
  } else if (strncasecmp(name, TSDB_INS_TABLE_STREAMS, len) == 0) {
L
Liu Jicong 已提交
110
    type = TSDB_MGMT_TABLE_STREAMS;
D
dapan1121 已提交
111 112
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) {
    type = TSDB_MGMT_TABLE_APPS;
113 114
  } else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) {
    type = TSDB_MGMT_TABLE_STREAM_TASKS;
115 116
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_PRIVILEGES, len) == 0) {
    type = TSDB_MGMT_TABLE_PRIVILEGES;
117
  } else {
S
Shengliang Guan 已提交
118
    mError("invalid show name:%s len:%d", name, len);
119 120 121 122 123 124
  }

  return type;
}

static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
S
Shengliang Guan 已提交
125 126
  SShowMgmt *pMgmt = &pMnode->showMgmt;

127 128
  int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1);
  if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1);
S
Shengliang Guan 已提交
129

L
Liu Jicong 已提交
130
  int32_t size = sizeof(SShowObj);
131

S
Shengliang Guan 已提交
132
  SShowObj showObj = {0};
133

L
Liu Jicong 已提交
134
  showObj.id = showId;
S
Shengliang Guan 已提交
135
  showObj.pMnode = pMnode;
L
Liu Jicong 已提交
136
  showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
S
Shengliang Guan 已提交
137
  memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
wmmhello's avatar
wmmhello 已提交
138
  tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
S
Shengliang Guan 已提交
139

S
Shengliang Guan 已提交
140
  int32_t   keepTime = tsShellActivityTimer * 6 * 1000;
141
  SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
S
Shengliang Guan 已提交
142
  if (pShow == NULL) {
S
Shengliang Guan 已提交
143
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
144
    mError("show:0x%" PRIx64 ", failed to put into cache since %s", showId, terrstr());
S
Shengliang Guan 已提交
145 146
    return NULL;
  }
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  mTrace("show:0x%" PRIx64 ", is created, data:%p", showId, pShow);
S
Shengliang Guan 已提交
149
  return pShow;
S
Shengliang Guan 已提交
150 151
}

S
Shengliang Guan 已提交
152
static void mndFreeShowObj(SShowObj *pShow) {
S
Shengliang Guan 已提交
153 154 155 156 157 158 159 160 161 162
  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;

  ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
  if (freeFp != NULL) {
    if (pShow->pIter != NULL) {
      (*freeFp)(pMnode, pShow->pIter);
    }
  }

S
Shengliang Guan 已提交
163
  mTrace("show:0x%" PRIx64 ", is destroyed, data:%p", pShow->id, pShow);
S
Shengliang Guan 已提交
164 165
}

166
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId) {
S
Shengliang Guan 已提交
167 168
  SShowMgmt *pMgmt = &pMnode->showMgmt;

169
  SShowObj *pShow = taosCacheAcquireByKey(pMgmt->cache, &showId, sizeof(showId));
S
Shengliang Guan 已提交
170
  if (pShow == NULL) {
S
Shengliang Guan 已提交
171
    mError("show:0x%" PRIx64 ", already destroyed", showId);
S
Shengliang Guan 已提交
172 173 174
    return NULL;
  }

S
Shengliang Guan 已提交
175
  mTrace("show:0x%" PRIx64 ", acquired from cache, data:%p", pShow->id, pShow);
S
Shengliang Guan 已提交
176 177 178 179 180
  return pShow;
}

static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
  if (pShow == NULL) return;
S
Shengliang Guan 已提交
181
  mTrace("show:0x%" PRIx64 ", released from cache, data:%p force:%d", pShow->id, pShow, forceRemove);
S
Shengliang Guan 已提交
182

S
Shengliang Guan 已提交
183 184
  // A bug in tcache.c
  forceRemove = 0;
S
Shengliang Guan 已提交
185 186 187 188

  SMnode    *pMnode = pShow->pMnode;
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
S
Shengliang Guan 已提交
189
}
S
Shengliang Guan 已提交
190

S
Shengliang Guan 已提交
191 192
static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
  SMnode    *pMnode = pReq->info.node;
193
  SShowMgmt *pMgmt = &pMnode->showMgmt;
194
  SShowObj  *pShow = NULL;
195
  int32_t    rowsToRead = SHOW_STEP_SIZE;
196 197
  int32_t    size = 0;
  int32_t    rowsRead = 0;
wmmhello's avatar
wmmhello 已提交
198
  mDebug("mndProcessRetrieveSysTableReq start");
199
  SRetrieveTableReq retrieveReq = {0};
S
Shengliang Guan 已提交
200
  if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
201 202 203 204
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
205 206
  mDebug("mndProcessRetrieveSysTableReq tb:%s", retrieveReq.tb);

207
  if (retrieveReq.showId == 0) {
208
    STableMetaRsp *pMeta = taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb));
S
Shengliang Guan 已提交
209
    if (pMeta == NULL) {
210
      pMeta = taosHashGet(pMnode->perfsMeta, retrieveReq.tb, strlen(retrieveReq.tb));
211 212 213 214 215
      if (pMeta == NULL) {
        terrno = TSDB_CODE_MND_INVALID_SYS_TABLENAME;
        mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
        return -1;
      }
S
Shengliang Guan 已提交
216 217
    }

218
    pShow = mndCreateShowObj(pMnode, &retrieveReq);
219 220 221 222 223
    if (pShow == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      mError("failed to process show-meta req since %s", terrstr());
      return -1;
    }
H
Haojun Liao 已提交
224

S
Shengliang Guan 已提交
225
    pShow->pMeta = pMeta;
226
    pShow->numOfColumns = pShow->pMeta->numOfColumns;
227 228 229 230 231 232 233 234 235
  } else {
    pShow = mndAcquireShowObj(pMnode, retrieveReq.showId);
    if (pShow == NULL) {
      terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
      mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
      return -1;
    }
  }

236
  if(pShow->type == TSDB_MGMT_TABLE_COL){   // expend capacity for ins_columns
wmmhello's avatar
wmmhello 已提交
237
    rowsToRead = SHOW_COLS_STEP_SIZE;
238 239
  } else if (pShow->type == TSDB_MGMT_TABLE_PRIVILEGES) {
    rowsToRead = SHOW_PRIVILEGES_STEP_SIZE;
wmmhello's avatar
wmmhello 已提交
240
  }
241 242
  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
  if (retrieveFp == NULL) {
243
    mndReleaseShowObj(pShow, false);
244 245 246 247 248
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
    return -1;
  }

249
  mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
250 251 252
  if (retrieveReq.user[0] != 0) {
    memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
  } else {
253
    memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
254
  }
D
dapan1121 已提交
255
  if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
256 257
    return -1;
  }
258

H
Haojun Liao 已提交
259
  int32_t numOfCols = pShow->pMeta->numOfColumns;
260

H
Haojun Liao 已提交
261
  SSDataBlock *pBlock = createDataBlock();
262
  for (int32_t i = 0; i < numOfCols; ++i) {
263
    SColumnInfoData idata = {0};
H
Haojun Liao 已提交
264 265

    SSchema *p = &pShow->pMeta->pSchemas[i];
266 267

    idata.info.bytes = p->bytes;
268
    idata.info.type = p->type;
269
    idata.info.colId = p->colId;
H
Haojun Liao 已提交
270
    blockDataAppendColInfo(pBlock, &idata);
271
  }
H
Haojun Liao 已提交
272

273
  blockDataEnsureCapacity(pBlock, rowsToRead);
H
Haojun Liao 已提交
274

275
  if (mndCheckRetrieveFinished(pShow)) {
276 277 278
    mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
    rowsRead = 0;
  } else {
279
    rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
D
dapan1121 已提交
280 281 282
    if (rowsRead < 0) {
      terrno = rowsRead;
      mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
283
      mndReleaseShowObj(pShow, true);
H
Haojun Liao 已提交
284
      blockDataDestroy(pBlock);
D
dapan1121 已提交
285 286
      return -1;
    }
287

288
    pBlock->info.rows = rowsRead;
289 290
    mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
  }
291

292
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
293
         blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
294 295 296

  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
  if (pRsp == NULL) {
297
    mndReleaseShowObj(pShow, false);
298 299 300 301 302 303 304 305 306
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
    blockDataDestroy(pBlock);
    return -1;
  }

  pRsp->handle = htobe64(pShow->id);

  if (rowsRead > 0) {
307
    char    *pStart = pRsp->data;
308 309 310 311 312 313 314 315 316 317 318 319 320 321
    SSchema *ps = pShow->pMeta->pSchemas;

    *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
    pStart += sizeof(int32_t);  // number of columns

    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
      SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
      pSchema->bytes = htonl(ps[i].bytes);
      pSchema->colId = htons(ps[i].colId);
      pSchema->type = ps[i].type;

      pStart += sizeof(SSysTableSchema);
    }

H
Haojun Liao 已提交
322
    int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
323 324
  }

325 326
  pRsp->numOfRows = htonl(rowsRead);
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
S
Shengliang Guan 已提交
327 328
  pReq->info.rsp = pRsp;
  pReq->info.rspLen = size;
329

D
dapan1121 已提交
330
  if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->restore)) {
331 332
    pRsp->completed = 1;
    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
333
    mndReleaseShowObj(pShow, true);
334 335
  } else {
    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
336
    mndReleaseShowObj(pShow, false);
337 338
  }

339
  blockDataDestroy(pBlock);
340 341 342
  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
343
static bool mndCheckRetrieveFinished(SShowObj *pShow) {
344
  if (pShow->pIter == NULL && pShow->numOfRows != 0) {
S
Shengliang Guan 已提交
345
    return true;
S
Shengliang Guan 已提交
346
  }
S
Shengliang Guan 已提交
347 348 349
  return false;
}

S
Shengliang Guan 已提交
350
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
S
Shengliang Guan 已提交
351 352 353 354
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->retrieveFps[showType] = fp;
}

S
Shengliang Guan 已提交
355
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
S
Shengliang Guan 已提交
356 357 358
  SShowMgmt *pMgmt = &pMnode->showMgmt;
  pMgmt->freeIterFps[showType] = fp;
}