mndBnode.c 15.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndBnode.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndTrans.h"

#define TSDB_BNODE_VER_NUMBER 1
#define TSDB_BNODE_RESERVE_SIZE 64

static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj);
static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw);
static int32_t  mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj);
static int32_t  mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj);
S
Shengliang Guan 已提交
29
static int32_t  mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew);
S
Shengliang Guan 已提交
30 31 32 33
static int32_t  mndProcessCreateBnodeReq(SMnodeMsg *pReq);
static int32_t  mndProcessDropBnodeReq(SMnodeMsg *pReq);
static int32_t  mndProcessCreateBnodeRsp(SMnodeMsg *pRsp);
static int32_t  mndProcessDropBnodeRsp(SMnodeMsg *pRsp);
S
Shengliang Guan 已提交
34
static int32_t  mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
S
Shengliang Guan 已提交
35
static int32_t  mndRetrieveBnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
static void     mndCancelGetNextBnode(SMnode *pMnode, void *pIter);

int32_t mndInitBnode(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_BNODE,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndBnodeActionEncode,
                     .decodeFp = (SdbDecodeFp)mndBnodeActionDecode,
                     .insertFp = (SdbInsertFp)mndBnodeActionInsert,
                     .updateFp = (SdbUpdateFp)mndBnodeActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndBnodeActionDelete};

  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_BNODE, mndProcessCreateBnodeReq);
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_BNODE, mndProcessDropBnodeReq);
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndProcessCreateBnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndProcessDropBnodeRsp);

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndGetBnodeMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode);

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupBnode(SMnode *pMnode) {}

61
static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t bnodeId) {
62 63
  SBnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_BNODE, &bnodeId);
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
64 65 66 67 68 69 70 71 72 73 74
    terrno = TSDB_CODE_MND_BNODE_NOT_EXIST;
  }
  return pObj;
}

static void mndReleaseBnode(SMnode *pMnode, SBnodeObj *pObj) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pObj);
}

static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj) {
75 76
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
77
  SSdbRaw *pRaw = sdbAllocRaw(SDB_BNODE, TSDB_BNODE_VER_NUMBER, sizeof(SBnodeObj) + TSDB_BNODE_RESERVE_SIZE);
78
  if (pRaw == NULL) goto BNODE_ENCODE_OVER;
S
Shengliang Guan 已提交
79 80

  int32_t dataPos = 0;
81 82 83 84 85 86
  SDB_SET_INT32(pRaw, dataPos, pObj->id, BNODE_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, BNODE_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, BNODE_ENCODE_OVER)
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_ENCODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
87

88 89 90 91 92 93 94 95
BNODE_ENCODE_OVER:
  if (terrno != 0) {
    mError("bnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
  }

  mTrace("bnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
96 97 98 99
  return pRaw;
}

static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw) {
100 101
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
102
  int8_t sver = 0;
103
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto BNODE_DECODE_OVER;
S
Shengliang Guan 已提交
104 105 106

  if (sver != TSDB_BNODE_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
107
    goto BNODE_DECODE_OVER;
S
Shengliang Guan 已提交
108 109
  }

110 111 112
  SSdbRow *pRow = sdbAllocRow(sizeof(SBnodeObj));
  if (pRow == NULL) goto BNODE_DECODE_OVER;

S
Shengliang Guan 已提交
113
  SBnodeObj *pObj = sdbGetRowObj(pRow);
114
  if (pObj == NULL) goto BNODE_DECODE_OVER;
S
Shengliang Guan 已提交
115 116

  int32_t dataPos = 0;
117 118 119 120 121 122
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, BNODE_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, BNODE_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, BNODE_DECODE_OVER)
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_DECODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
123

124 125 126 127 128 129 130 131
BNODE_DECODE_OVER:
  if (terrno != 0) {
    mError("bnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }

  mTrace("bnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
S
Shengliang Guan 已提交
132 133 134 135
  return pRow;
}

static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj) {
136
  mTrace("bnode:%d, perform insert action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
137 138 139
  pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
  if (pObj->pDnode == NULL) {
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
140
    mError("bnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
S
Shengliang Guan 已提交
141 142 143 144 145 146 147
    return -1;
  }

  return 0;
}

static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) {
148
  mTrace("bnode:%d, perform delete action, row:%p", pObj->id, pObj);
S
Shengliang Guan 已提交
149 150 151 152 153 154 155 156
  if (pObj->pDnode != NULL) {
    sdbRelease(pSdb, pObj->pDnode);
    pObj->pDnode = NULL;
  }

  return 0;
}

S
Shengliang Guan 已提交
157
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) {
S
Shengliang Guan 已提交
158
  mTrace("bnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
S
Shengliang Guan 已提交
159
  pOld->updateTime = pNew->updateTime;
S
Shengliang Guan 已提交
160 161 162 163 164 165 166 167 168 169 170
  return 0;
}

static int32_t mndSetCreateBnodeRedoLogs(STrans *pTrans, SBnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndBnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
  return 0;
}

171 172 173 174 175 176 177 178
static int32_t mndSetCreateBnodeUndoLogs(STrans *pTrans, SBnodeObj *pObj) {
  SSdbRaw *pUndoRaw = mndBnodeActionEncode(pObj);
  if (pUndoRaw == NULL) return -1;
  if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
  if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

S
Shengliang Guan 已提交
179 180 181 182 183 184 185 186 187
static int32_t mndSetCreateBnodeCommitLogs(STrans *pTrans, SBnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndBnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
  return 0;
}

static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) {
S
Shengliang Guan 已提交
188 189
  SDCreateBnodeReq *pReq = malloc(sizeof(SDCreateBnodeReq));
  if (pReq == NULL) {
S
Shengliang Guan 已提交
190 191 192
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
193
  pReq->dnodeId = htonl(pDnode->id);
S
Shengliang Guan 已提交
194 195 196

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
197
  action.pCont = pReq;
S
Shengliang Guan 已提交
198
  action.contLen = sizeof(SDCreateBnodeReq);
S
Shengliang Guan 已提交
199
  action.msgType = TDMT_DND_CREATE_BNODE;
200
  action.acceptableCode = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED;
S
Shengliang Guan 已提交
201 202

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
203
    free(pReq);
S
Shengliang Guan 已提交
204 205 206 207 208 209
    return -1;
  }

  return 0;
}

210
static int32_t mndSetCreateBnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) {
S
Shengliang Guan 已提交
211 212
  SDDropBnodeReq *pReq = malloc(sizeof(SDDropBnodeReq));
  if (pReq == NULL) {
213 214 215
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
216
  pReq->dnodeId = htonl(pDnode->id);
217 218 219

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
220
  action.pCont = pReq;
221 222 223 224 225
  action.contLen = sizeof(SDDropBnodeReq);
  action.msgType = TDMT_DND_DROP_BNODE;
  action.acceptableCode = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;

  if (mndTransAppendUndoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
226
    free(pReq);
227 228 229 230 231 232
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
233
static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) {
234 235
  int32_t code = -1;

236 237 238 239
  SBnodeObj bnodeObj = {0};
  bnodeObj.id = pDnode->id;
  bnodeObj.createdTime = taosGetTimestampMs();
  bnodeObj.updateTime = bnodeObj.createdTime;
S
Shengliang Guan 已提交
240

S
Shengliang Guan 已提交
241
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
242
  if (pTrans == NULL) goto CREATE_BNODE_OVER;
S
Shengliang Guan 已提交
243

244 245 246 247 248 249 250
  mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
  if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
  if (mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
  if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
  if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
  if (mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_BNODE_OVER;
S
Shengliang Guan 已提交
251 252 253 254 255 256 257 258

  code = 0;

CREATE_BNODE_OVER:
  mndTransDrop(pTrans);
  return code;
}

S
Shengliang Guan 已提交
259 260 261
static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pReq) {
  SMnode           *pMnode = pReq->pMnode;
  SMCreateBnodeReq *pCreate = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
262 263 264

  pCreate->dnodeId = htonl(pCreate->dnodeId);

265
  mDebug("bnode:%d, start to create", pCreate->dnodeId);
S
Shengliang Guan 已提交
266 267 268

  SBnodeObj *pObj = mndAcquireBnode(pMnode, pCreate->dnodeId);
  if (pObj != NULL) {
269
    mError("bnode:%d, bnode already exist", pObj->id);
S
Shengliang Guan 已提交
270
    terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST;
S
Shengliang Guan 已提交
271 272
    mndReleaseBnode(pMnode, pObj);
    return -1;
273 274 275
  } else if (terrno != TSDB_CODE_MND_BNODE_NOT_EXIST) {
    mError("bnode:%d, failed to create bnode since %s", pCreate->dnodeId, terrstr());
    return -1;
S
Shengliang Guan 已提交
276 277 278 279
  }

  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
  if (pDnode == NULL) {
280
    mError("bnode:%d, dnode not exist", pCreate->dnodeId);
S
Shengliang Guan 已提交
281 282 283 284
    terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
    return -1;
  }

S
Shengliang Guan 已提交
285
  int32_t code = mndCreateBnode(pMnode, pReq, pDnode, pCreate);
S
Shengliang Guan 已提交
286 287 288
  mndReleaseDnode(pMnode, pDnode);

  if (code != 0) {
289
    mError("bnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
S
Shengliang Guan 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
    return -1;
  }

  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

static int32_t mndSetDropBnodeRedoLogs(STrans *pTrans, SBnodeObj *pObj) {
  SSdbRaw *pRedoRaw = mndBnodeActionEncode(pObj);
  if (pRedoRaw == NULL) return -1;
  if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
  if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
  return 0;
}

static int32_t mndSetDropBnodeCommitLogs(STrans *pTrans, SBnodeObj *pObj) {
  SSdbRaw *pCommitRaw = mndBnodeActionEncode(pObj);
  if (pCommitRaw == NULL) return -1;
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
  if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
  return 0;
}

static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) {
S
Shengliang Guan 已提交
313 314
  SDDropBnodeReq *pReq = malloc(sizeof(SDDropBnodeReq));
  if (pReq == NULL) {
S
Shengliang Guan 已提交
315 316 317
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
318
  pReq->dnodeId = htonl(pDnode->id);
S
Shengliang Guan 已提交
319 320 321

  STransAction action = {0};
  action.epSet = mndGetDnodeEpset(pDnode);
S
Shengliang Guan 已提交
322
  action.pCont = pReq;
S
Shengliang Guan 已提交
323
  action.contLen = sizeof(SDDropBnodeReq);
S
Shengliang Guan 已提交
324
  action.msgType = TDMT_DND_DROP_BNODE;
325
  action.acceptableCode = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
S
Shengliang Guan 已提交
326 327

  if (mndTransAppendRedoAction(pTrans, &action) != 0) {
S
Shengliang Guan 已提交
328
    free(pReq);
S
Shengliang Guan 已提交
329 330 331 332 333 334
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
335
static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pReq, SBnodeObj *pObj) {
S
Shengliang Guan 已提交
336
  int32_t code = -1;
337

S
Shengliang Guan 已提交
338
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
339
  if (pTrans == NULL) goto DROP_BNODE_OVER;
S
Shengliang Guan 已提交
340

341
  mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
342 343 344 345
  if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER;
  if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER;
  if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_BNODE_OVER;
  if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_BNODE_OVER;
S
Shengliang Guan 已提交
346 347 348 349 350 351 352 353

  code = 0;

DROP_BNODE_OVER:
  mndTransDrop(pTrans);
  return code;
}

S
Shengliang Guan 已提交
354 355 356
static int32_t mndProcessDropBnodeReq(SMnodeMsg *pReq) {
  SMnode         *pMnode = pReq->pMnode;
  SMDropBnodeReq *pDrop = pReq->rpcMsg.pCont;
S
Shengliang Guan 已提交
357 358
  pDrop->dnodeId = htonl(pDrop->dnodeId);

359
  mDebug("bnode:%d, start to drop", pDrop->dnodeId);
S
Shengliang Guan 已提交
360 361 362

  if (pDrop->dnodeId <= 0) {
    terrno = TSDB_CODE_SDB_APP_ERROR;
363
    mError("bnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
S
Shengliang Guan 已提交
364 365 366 367 368
    return -1;
  }

  SBnodeObj *pObj = mndAcquireBnode(pMnode, pDrop->dnodeId);
  if (pObj == NULL) {
369
    mError("bnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
S
Shengliang Guan 已提交
370 371 372
    return -1;
  }

S
Shengliang Guan 已提交
373
  int32_t code = mndDropBnode(pMnode, pReq, pObj);
S
Shengliang Guan 已提交
374
  if (code != 0) {
S
Shengliang Guan 已提交
375
    sdbRelease(pMnode->pSdb, pObj);
376
    mError("bnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
S
Shengliang Guan 已提交
377 378 379
    return -1;
  }

S
Shengliang Guan 已提交
380
  sdbRelease(pMnode->pSdb, pObj);
S
Shengliang Guan 已提交
381 382 383
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}

S
Shengliang Guan 已提交
384 385
static int32_t mndProcessCreateBnodeRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
386 387 388
  return 0;
}

S
Shengliang Guan 已提交
389 390
static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp) {
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
391 392 393
  return 0;
}

S
Shengliang Guan 已提交
394
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
S
Shengliang Guan 已提交
395
  SMnode *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
  SSchema *pSchema = pMeta->pSchema;

  pShow->bytes[cols] = 2;
  pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
  strcpy(pSchema[cols].name, "id");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "endpoint");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pMeta->numOfColumns = htonl(cols);
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
  strcpy(pMeta->tbFname, mndShowStr(pShow->type));

  return 0;
}

S
Shengliang Guan 已提交
434 435
static int32_t mndRetrieveBnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode    *pMnode = pReq->pMnode;
S
Shengliang Guan 已提交
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
  SSdb      *pSdb = pMnode->pSdb;
  int32_t    numOfRows = 0;
  int32_t    cols = 0;
  SBnodeObj *pObj = NULL;
  char      *pWrite;

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_BNODE, pShow->pIter, (void **)&pObj);
    if (pShow->pIter == NULL) break;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int16_t *)pWrite = pObj->id;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]);

    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int64_t *)pWrite = pObj->createdTime;
    cols++;

    numOfRows++;
    sdbRelease(pSdb, pObj);
  }

  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
  pShow->numOfReads += numOfRows;

  return numOfRows;
}

static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}