mndQnode.c 15.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndQnode.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndTrans.h"

#define TSDB_QNODE_VER_NUMBER 1
#define TSDB_QNODE_RESERVE_SIZE 64

static SSdbRaw *mndQnodeActionEncode(SQnodeObj *pObj);
static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw);
static int32_t  mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj);
static int32_t  mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj);
S
Shengliang Guan 已提交
29
static int32_t  mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew);
S
Shengliang Guan 已提交
30 31 32 33
static int32_t  mndProcessCreateQnodeReq(SMnodeMsg *pReq);
static int32_t  mndProcessDropQnodeReq(SMnodeMsg *pReq);
static int32_t  mndProcessCreateQnodeRsp(SMnodeMsg *pRsp);
static int32_t  mndProcessDropQnodeRsp(SMnodeMsg *pRsp);
S
Shengliang Guan 已提交
34
static int32_t  mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
35
static int32_t  mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
static void     mndCancelGetNextQnode(SMnode *pMnode, void *pIter);

int32_t mndInitQnode(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_QNODE,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndQnodeActionEncode,
                     .decodeFp = (SdbDecodeFp)mndQnodeActionDecode,
                     .insertFp = (SdbInsertFp)mndQnodeActionInsert,
                     .updateFp = (SdbUpdateFp)mndQnodeActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndQnodeActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_QNODE, mndProcessCreateQnodeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_QNODE, mndProcessDropQnodeReq);
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndProcessCreateQnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp);

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndGetQnodeMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndCancelGetNextQnode);

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupQnode(SMnode *pMnode) {}

static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) {
62 63
  SQnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_QNODE, &qnodeId);
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
64 65 66 67 68 69 70 71 72 73 74
    terrno = TSDB_CODE_MND_QNODE_NOT_EXIST;
  }
  return pObj;
}

static void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pObj);
}

static SSdbRaw *mndQnodeActionEncode(SQnodeObj *pObj) {
75 76
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
77
  SSdbRaw *pRaw = sdbAllocRaw(SDB_QNODE, TSDB_QNODE_VER_NUMBER, sizeof(SQnodeObj) + TSDB_QNODE_RESERVE_SIZE);
78
  if (pRaw == NULL) goto QNODE_ENCODE_OVER;
S
Shengliang Guan 已提交
79 80

  int32_t dataPos = 0;
81 82 83 84 85 86
  SDB_SET_INT32(pRaw, dataPos, pObj->id, QNODE_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, QNODE_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, QNODE_ENCODE_OVER)
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_QNODE_RESERVE_SIZE, QNODE_ENCODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
87

88 89 90 91 92 93 94 95
QNODE_ENCODE_OVER:
  if (terrno != 0) {
    mError("qnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("qnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
96 97 98 99
  return pRaw;
}

static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) {
100 101
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
102
  int8_t sver = 0;
103
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto QNODE_DECODE_OVER;
S
Shengliang Guan 已提交
104 105 106

  if (sver != TSDB_QNODE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
107
    goto QNODE_DECODE_OVER;
S
Shengliang Guan 已提交
108 109
  }

110 111 112
  SSdbRow *pRow = sdbAllocRow(sizeof(SQnodeObj));
  if (pRow == NULL) goto QNODE_DECODE_OVER;

S
Shengliang Guan 已提交
113
  SQnodeObj *pObj = sdbGetRowObj(pRow);
114
  if (pObj == NULL) goto QNODE_DECODE_OVER;
S
Shengliang Guan 已提交
115 116

  int32_t dataPos = 0;
117 118 119 120 121 122
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, QNODE_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, QNODE_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, QNODE_DECODE_OVER)
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_QNODE_RESERVE_SIZE, QNODE_DECODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
123

124 125 126 127 128 129 130 131
QNODE_DECODE_OVER:
  if (terrno != 0) {
    mError("qnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  mTrace("qnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
132 133 134 135
  return pRow;
}

static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) {
136
  mTrace("qnode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
137 138 139 140 141 142 143 144 145 146 147
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
    mError("qnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
    return -1;
  }

  return 0;
}

static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
148
  mTrace("qnode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
149 150 151 152 153 154 155 156
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
  }

  return 0;
}

S
Shengliang Guan 已提交
157
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) {
S
Shengliang Guan 已提交
158
  mTrace("qnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
159
  pOld->updateTime = pNew->updateTime;
S
Shengliang Guan 已提交
160 161 162 163 164 165 166 167 168 169 170
  return 0;
}

static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

171 172 173 174 175 176 177 178
static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndQnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
179 180 181 182 183 184 185 186 187
static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
S
Shengliang Guan 已提交
188 189
  SDCreateQnodeReq *pReq = malloc(sizeof(SDCreateQnodeReq));
  if (pReq == NULL) {
S
Shengliang Guan 已提交
190 191 192
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
193
  pReq->dnodeId = htonl(pDnode->id);
S
Shengliang Guan 已提交
194 195 196

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
197
  action.pCont = pReq;
S
Shengliang Guan 已提交
198
  action.contLen = sizeof(SDCreateQnodeReq);
S
Shengliang Guan 已提交
199
  action.msgType = TDMT_DND_CREATE_QNODE;
200
  action.acceptableCode = TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED;
S
Shengliang Guan 已提交
201 202

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
203
    free(pReq);
S
Shengliang Guan 已提交
204 205 206 207 208 209
    return -1;
  }

  return 0;
}

210
static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
S
Shengliang Guan 已提交
211 212
  SDDropQnodeReq *pReq = malloc(sizeof(SDDropQnodeReq));
  if (pReq == NULL) {
213 214 215
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
216
  pReq->dnodeId = htonl(pDnode->id);
217 218 219

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
220
  action.pCont = pReq;
221 222 223 224 225
  action.contLen = sizeof(SDDropQnodeReq);
  action.msgType = TDMT_DND_DROP_QNODE;
  action.acceptableCode = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;

  if (mndTransAppendUndoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
226
    free(pReq);
227 228 229 230 231 232
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
233
static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) {
234 235
  int32_t code = -1;

S
Shengliang Guan 已提交
236 237 238 239 240
  SQnodeObj qnodeObj = {0};
  qnodeObj.id = pDnode->id;
  qnodeObj.createdTime = taosGetTimestampMs();
  qnodeObj.updateTime = qnodeObj.createdTime;

S
Shengliang Guan 已提交
241
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
242
  if (pTrans == NULL) goto CREATE_QNODE_OVER;
S
Shengliang Guan 已提交
243

244 245 246 247 248 249 250
  mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
  if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
  if (mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
  if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
  if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
  if (mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_QNODE_OVER;
S
Shengliang Guan 已提交
251 252 253 254 255 256 257 258

  code = 0;

CREATE_QNODE_OVER:
  mndTransDrop(pTrans);
  return code;
}

S
Shengliang Guan 已提交
259 260 261
static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pReq) {
  SMnode           *pMnode = pReq->pMnode;
  SMCreateQnodeReq *pCreate = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
262 263 264 265 266 267 268 269

  pCreate->dnodeId = htonl(pCreate->dnodeId);

  mDebug("qnode:%d, start to create", pCreate->dnodeId);

  SQnodeObj *pObj = mndAcquireQnode(pMnode, pCreate->dnodeId);
  if (pObj != NULL) {
    mError("qnode:%d, qnode already exist", pObj->id);
S
Shengliang Guan 已提交
270
    terrno = TSDB_CODE_MND_QNODE_ALREADY_EXIST;
S
Shengliang Guan 已提交
271 272
    mndReleaseQnode(pMnode, pObj);
    return -1;
273 274 275
  } else if (terrno != TSDB_CODE_MND_QNODE_NOT_EXIST) {
    mError("qnode:%d, failed to create qnode since %s", pCreate->dnodeId, terrstr());
    return -1;
S
Shengliang Guan 已提交
276 277 278 279 280 281 282 283 284
  }

  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
  if (pDnode == NULL) {
    mError("qnode:%d, dnode not exist", pCreate->dnodeId);
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
    return -1;
  }

S
Shengliang Guan 已提交
285
  int32_t code = mndCreateQnode(pMnode, pReq, pDnode, pCreate);
S
Shengliang Guan 已提交
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
  mndReleaseDnode(pMnode, pDnode);

  if (code != 0) {
    mError("qnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

static int32_t mndSetDropQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
S
Shengliang Guan 已提交
313 314
  SDDropQnodeReq *pReq = malloc(sizeof(SDDropQnodeReq));
  if (pReq == NULL) {
S
Shengliang Guan 已提交
315 316 317
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
318
  pReq->dnodeId = htonl(pDnode->id);
S
Shengliang Guan 已提交
319 320 321

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
322
  action.pCont = pReq;
S
Shengliang Guan 已提交
323
  action.contLen = sizeof(SDDropQnodeReq);
S
Shengliang Guan 已提交
324
  action.msgType = TDMT_DND_DROP_QNODE;
325
  action.acceptableCode = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
S
Shengliang Guan 已提交
326 327

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
328
    free(pReq);
S
Shengliang Guan 已提交
329 330 331 332 333 334
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
335
static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pReq, SQnodeObj *pObj) {
S
Shengliang Guan 已提交
336
  int32_t code = -1;
337

S
Shengliang Guan 已提交
338
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
339
  if (pTrans == NULL) goto DROP_QNODE_OVER;
S
Shengliang Guan 已提交
340 341

  mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
342 343 344 345
  if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) goto DROP_QNODE_OVER;
  if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) goto DROP_QNODE_OVER;
  if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_QNODE_OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_QNODE_OVER;
S
Shengliang Guan 已提交
346 347 348 349 350 351 352 353

  code = 0;

DROP_QNODE_OVER:
  mndTransDrop(pTrans);
  return code;
}

S
Shengliang Guan 已提交
354 355 356
static int32_t mndProcessDropQnodeReq(SMnodeMsg *pReq) {
  SMnode         *pMnode = pReq->pMnode;
  SMDropQnodeReq *pDrop = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
357 358 359 360 361 362 363 364 365 366 367 368
  pDrop->dnodeId = htonl(pDrop->dnodeId);

  mDebug("qnode:%d, start to drop", pDrop->dnodeId);

  if (pDrop->dnodeId <= 0) {
    terrno = TSDB_CODE_SDB_APP_ERROR;
    mError("qnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
    return -1;
  }

  SQnodeObj *pObj = mndAcquireQnode(pMnode, pDrop->dnodeId);
  if (pObj == NULL) {
369
    mError("qnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
S
Shengliang Guan 已提交
370 371 372
    return -1;
  }

S
Shengliang Guan 已提交
373
  int32_t code = mndDropQnode(pMnode, pReq, pObj);
S
Shengliang Guan 已提交
374
  if (code != 0) {
S
Shengliang Guan 已提交
375
    sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
376 377 378 379
    mError("qnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
380
  sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
381 382 383
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
384 385
static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
386 387 388
  return 0;
}

S
Shengliang Guan 已提交
389 390
static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
391 392 393
  return 0;
}

S
Shengliang Guan 已提交
394
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
395
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
  SSchema *pSchema = pMeta->pSchema;

  pShow->bytes[cols] = 2;
  pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
  strcpy(pSchema[cols].name, "id");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "endpoint");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pMeta->numOfColumns = htonl(cols);
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
429
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
430 431 432 433

  return 0;
}

S
Shengliang Guan 已提交
434 435
static int32_t mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode    *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
  SQnodeObj *pObj = NULL;
  char      *pWrite;

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_QNODE, pShow->pIter, (void **)&pObj);
    if (pShow->pIter == NULL) break;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int16_t *)pWrite = pObj->id;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]);

    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pObj->createdTime;
    cols++;

    numOfRows++;
    sdbRelease(pSdb, pObj);
  }

  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
  pShow->numOfReads += numOfRows;

  return numOfRows;
}

static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}