vnodeOpen.c 20.9 KB
Newer Older
H
save  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
17
#include "tstreamUpdate.h"
H
save  
Hongze Cheng 已提交
18

S
Shengliang Guan 已提交
19
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
H
Hongze Cheng 已提交
20
  SVnodeInfo info = {0};
S
Shengliang Guan 已提交
21
  char       dir[TSDB_FILENAME_LEN] = {0};
H
Hongze Cheng 已提交
22 23 24

  // check config
  if (vnodeCheckCfg(pCfg) < 0) {
S
Shengliang Guan 已提交
25
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno));
H
Hongze Cheng 已提交
26 27 28 29
    return -1;
  }

  // create vnode env
H
Hongze Cheng 已提交
30 31 32 33 34 35 36 37 38 39
  if (pTfs) {
    if (tfsMkdirAt(pTfs, path, (SDiskID){0}) < 0) {
      vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno));
      return -1;
    }
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
  } else {
    if (taosMkDir(path)) {
      return TAOS_SYSTEM_ERROR(errno);
    }
H
Hongze Cheng 已提交
40
    snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
H
Hongze Cheng 已提交
41 42
  }

H
Hongze Cheng 已提交
43 44 45 46 47
  if (pCfg) {
    info.config = *pCfg;
  } else {
    info.config = vnodeCfgDefault;
  }
M
Minghao Li 已提交
48 49
  info.state.committed = -1;
  info.state.applied = -1;
H
Hongze Cheng 已提交
50
  info.state.commitID = 0;
H
Hongze Cheng 已提交
51

H
Hongze Cheng 已提交
52
  vInfo("vgId:%d, save config while create", info.config.vgId);
H
Hongze Cheng 已提交
53
  if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) {
H
Hongze Cheng 已提交
54
    vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno));
H
Hongze Cheng 已提交
55 56 57
    return -1;
  }

H
Hongze Cheng 已提交
58
  vInfo("vgId:%d, vnode is created", info.config.vgId);
S
Shengliang Guan 已提交
59 60 61
  return 0;
}

62
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
S
Shengliang Guan 已提交
63 64 65 66 67 68 69 70 71
  SVnodeInfo info = {0};
  char       dir[TSDB_FILENAME_LEN] = {0};
  int32_t    ret = 0;

  if (pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
  }
H
Hongze Cheng 已提交
72

S
Shengliang Guan 已提交
73 74 75 76 77 78 79
  ret = vnodeLoadInfo(dir, &info);
  if (ret < 0) {
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
    return -1;
  }

  SSyncCfg *pCfg = &info.config.syncCfg;
C
cadem 已提交
80 81 82
 
  pCfg->replicaNum = 0;
  pCfg->totalReplicaNum = 0;
S
Shengliang Guan 已提交
83 84 85 86
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));

  for (int i = 0; i < pReq->replica; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
87
    pNode->nodeId = pReq->replicas[i].id;
S
Shengliang Guan 已提交
88 89
    pNode->nodePort = pReq->replicas[i].port;
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
C
cadem 已提交
90
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
91
    (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
92
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
C
cadem 已提交
93
    pCfg->replicaNum++;
S
Shengliang Guan 已提交
94
  }
C
cadem 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  if(pReq->selfIndex != -1){
    pCfg->myIndex = pReq->selfIndex;
  }
  for (int i = pCfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    pNode->nodeId = pReq->learnerReplicas[pCfg->totalReplicaNum].id;
    pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
    (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
    pCfg->totalReplicaNum++;
  }
  pCfg->totalReplicaNum += pReq->replica;
  if(pReq->learnerSelfIndex != -1){
    pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
  }

  vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", 
            pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex);
S
Shengliang Guan 已提交
115

S
Shengliang Guan 已提交
116
  info.config.syncCfg = *pCfg;
S
Shengliang Guan 已提交
117 118 119 120 121 122
  ret = vnodeSaveInfo(dir, &info);
  if (ret < 0) {
    vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
    return -1;
  }

H
Hongze Cheng 已提交
123
  ret = vnodeCommitInfo(dir);
S
Shengliang Guan 已提交
124 125 126 127 128
  if (ret < 0) {
    vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
    return -1;
  }

S
Shengliang Guan 已提交
129
  vInfo("vgId:%d, vnode config is saved", info.config.vgId);
H
Hongze Cheng 已提交
130 131 132
  return 0;
}

S
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
  int32_t ret = tfsRename(pTfs, srcPath, dstPath);
  if (ret != 0) return ret;

  char oldRname[TSDB_FILENAME_LEN] = {0};
  char newRname[TSDB_FILENAME_LEN] = {0};
  char tsdbPath[TSDB_FILENAME_LEN] = {0};
  char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0};
  snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", dstPath, TD_DIRSEP);
  snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP);

  STfsDir *tsdbDir = tfsOpendir(pTfs, tsdbPath);
  if (tsdbDir == NULL) return 0;

  while (1) {
    const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
    if (tsdbFile == NULL) break;
H
Hongze Cheng 已提交
150
    if (tsdbFile->rname[0] == '\0') continue;
S
Shengliang Guan 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
    tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN);

    char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
    if (tsdbFilePrefixPos == NULL) continue;

    int32_t tsdbFileVgId = atoi(tsdbFilePrefixPos + 6);
    if (tsdbFileVgId == srcVgId) {
      char *tsdbFileSurfixPos = strstr(tsdbFilePrefixPos, "f");
      if (tsdbFileSurfixPos == NULL) continue;

      tsdbFilePrefixPos[6] = 0;
      snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
      vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);

      ret = tfsRename(pTfs, tsdbFile->rname, newRname);
      if (ret != 0) {
        vInfo("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
        tfsClosedir(tsdbDir);
        return ret;
      }
    }
  }

  tfsClosedir(tsdbDir);
  return 0;
}

178 179 180 181 182 183 184 185 186 187 188
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
  SVnodeInfo info = {0};
  char       dir[TSDB_FILENAME_LEN] = {0};
  int32_t    ret = 0;

  if (pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, srcPath);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", srcPath);
  }

S
Shengliang Guan 已提交
189 190
  // todo add stat file to handle exception while vnode open

191 192 193 194 195 196
  ret = vnodeLoadInfo(dir, &info);
  if (ret < 0) {
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno));
    return -1;
  }

197 198
  vInfo("vgId:%d, alter hashrange from [%u, %u] to [%u, %u]", pReq->srcVgId, info.config.hashBegin, info.config.hashEnd,
        pReq->hashBegin, pReq->hashEnd);
199 200 201
  info.config.vgId = pReq->dstVgId;
  info.config.hashBegin = pReq->hashBegin;
  info.config.hashEnd = pReq->hashEnd;
B
Benguang Zhao 已提交
202
  info.config.hashChange = true;
203 204 205 206 207
  info.config.walCfg.vgId = pReq->dstVgId;

  SSyncCfg *pCfg = &info.config.syncCfg;
  pCfg->myIndex = 0;
  pCfg->replicaNum = 1;
C
cadem 已提交
208
  pCfg->totalReplicaNum = 1;
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));

  vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId);
  SNodeInfo *pNode = &pCfg->nodeInfo[0];
  pNode->nodePort = tsServerPort;
  tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
  (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
  vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);

  info.config.syncCfg = *pCfg;

  ret = vnodeSaveInfo(dir, &info);
  if (ret < 0) {
    vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno));
    return -1;
  }

H
Hongze Cheng 已提交
226
  ret = vnodeCommitInfo(dir);
227 228 229 230 231
  if (ret < 0) {
    vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
    return -1;
  }

232
  vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
S
Shengliang Guan 已提交
233
  ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs);
234 235 236 237 238 239 240 241 242 243
  if (ret < 0) {
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
           tstrerror(terrno));
    return -1;
  }

  vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
  return 0;
}

S
Shengliang Guan 已提交
244 245 246 247
void vnodeDestroy(const char *path, STfs *pTfs) {
  vInfo("path:%s is removed while destroy vnode", path);
  tfsRmdir(pTfs, path);
}
H
refact  
Hongze Cheng 已提交
248

H
Hongze Cheng 已提交
249 250 251
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
  SVnode    *pVnode = NULL;
  SVnodeInfo info = {0};
S
Shengliang Guan 已提交
252 253 254
  char       dir[TSDB_FILENAME_LEN] = {0};
  char       tdir[TSDB_FILENAME_LEN * 2] = {0};
  int32_t    ret = 0;
H
more  
Hongze Cheng 已提交
255

H
Hongze Cheng 已提交
256 257 258 259 260
  if (pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
  }
H
Hongze Cheng 已提交
261

H
Hongze Cheng 已提交
262 263
  info.config = vnodeCfgDefault;

H
Hongze Cheng 已提交
264 265 266 267
  // load vnode info
  ret = vnodeLoadInfo(dir, &info);
  if (ret < 0) {
    vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
H
more  
Hongze Cheng 已提交
268 269 270
    return NULL;
  }

271 272 273
  // save vnode info on dnode ep changed
  bool      updated = false;
  SSyncCfg *pCfg = &info.config.syncCfg;
C
cadem 已提交
274
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
275 276 277 278 279 280 281 282
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
      updated = true;
    }
  }
  if (updated) {
    vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
    (void)vnodeSaveInfo(dir, &info);
H
Hongze Cheng 已提交
283
    (void)vnodeCommitInfo(dir);
284 285
  }

H
Hongze Cheng 已提交
286
  // create handle
S
Shengliang Guan 已提交
287
  pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
H
more  
Hongze Cheng 已提交
288
  if (pVnode == NULL) {
H
Hongze Cheng 已提交
289
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
290
    vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
H
more  
Hongze Cheng 已提交
291 292 293
    return NULL;
  }

H
Hongze Cheng 已提交
294 295
  pVnode->path = (char *)&pVnode[1];
  strcpy(pVnode->path, path);
H
Hongze Cheng 已提交
296
  pVnode->config = info.config;
H
Hongze Cheng 已提交
297
  pVnode->state.committed = info.state.committed;
H
Hongze Cheng 已提交
298
  pVnode->state.commitTerm = info.state.commitTerm;
H
Hongze Cheng 已提交
299
  pVnode->state.commitID = info.state.commitID;
300 301
  pVnode->state.applied = info.state.committed;
  pVnode->state.applyTerm = info.state.commitTerm;
H
Hongze Cheng 已提交
302 303
  pVnode->pTfs = pTfs;
  pVnode->msgCb = msgCb;
304
  taosThreadMutexInit(&pVnode->lock, NULL);
305
  pVnode->blocked = false;
H
Hongze Cheng 已提交
306

307
  tsem_init(&pVnode->syncSem, 0, 0);
H
Hongze Cheng 已提交
308
  tsem_init(&(pVnode->canCommit), 0, 1);
H
Hongze Cheng 已提交
309 310
  taosThreadMutexInit(&pVnode->mutex, NULL);
  taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
H
Hongze Cheng 已提交
311

H
Hongze Cheng 已提交
312 313
  int8_t rollback = vnodeShouldRollback(pVnode);

H
Hongze Cheng 已提交
314
  // open buffer pool
H
Hongze Cheng 已提交
315
  if (vnodeOpenBufPool(pVnode) < 0) {
S
Shengliang Guan 已提交
316
    vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
317
    goto _err;
H
more  
Hongze Cheng 已提交
318 319
  }

H
Hongze Cheng 已提交
320
  // open meta
H
Hongze Cheng 已提交
321
  if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
S
Shengliang Guan 已提交
322
    vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
323
    goto _err;
H
refact  
Hongze Cheng 已提交
324 325
  }

H
Hongze Cheng 已提交
326
  // open tsdb
H
Hongze Cheng 已提交
327
  if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
S
Shengliang Guan 已提交
328
    vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
329 330 331 332
    goto _err;
  }

  // open sma
H
Hongze Cheng 已提交
333
  if (smaOpen(pVnode, rollback)) {
S
Shengliang Guan 已提交
334
    vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
335
    goto _err;
H
refact  
Hongze Cheng 已提交
336 337
  }

H
Hongze Cheng 已提交
338 339
  // open wal
  sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
wafwerar's avatar
wafwerar 已提交
340
  taosRealPath(tdir, NULL, sizeof(tdir));
341

H
Hongze Cheng 已提交
342
  pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
H
merge  
Hongze Cheng 已提交
343
  if (pVnode->pWal == NULL) {
344
    vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
H
Hongze Cheng 已提交
345
    goto _err;
H
merge  
Hongze Cheng 已提交
346
  }
H
refact  
Hongze Cheng 已提交
347

H
Hongze Cheng 已提交
348 349
  // open tq
  sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
wafwerar's avatar
wafwerar 已提交
350
  taosRealPath(tdir, NULL, sizeof(tdir));
L
Liu Jicong 已提交
351
  pVnode->pTq = tqOpen(tdir, pVnode);
L
Liu Jicong 已提交
352
  if (pVnode->pTq == NULL) {
S
Shengliang Guan 已提交
353
    vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
354
    goto _err;
L
Liu Jicong 已提交
355 356
  }

H
Hongze Cheng 已提交
357
  // open query
D
dapan1121 已提交
358
  if (vnodeQueryOpen(pVnode)) {
S
Shengliang Guan 已提交
359
    vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
360
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
361
    goto _err;
D
dapan1121 已提交
362 363
  }

H
Hongze Cheng 已提交
364 365
  // vnode begin
  if (vnodeBegin(pVnode) < 0) {
S
Shengliang Guan 已提交
366
    vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
367
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
368 369 370
    goto _err;
  }

371 372
  // open sync
  if (vnodeSyncOpen(pVnode, dir)) {
S
Shengliang Guan 已提交
373
    vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
374 375
    goto _err;
  }
H
Hongze Cheng 已提交
376 377 378 379

  if (rollback) {
    vnodeRollback(pVnode);
  }
H
Hongze Cheng 已提交
380 381 382 383 384 385 386

  return pVnode;

_err:
  if (pVnode->pQuery) vnodeQueryClose(pVnode);
  if (pVnode->pTq) tqClose(pVnode->pTq);
  if (pVnode->pWal) walClose(pVnode->pWal);
387
  if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
C
Cary Xu 已提交
388
  if (pVnode->pSma) smaClose(pVnode->pSma);
389
  if (pVnode->pMeta) metaClose(&pVnode->pMeta);
H
Hongze Cheng 已提交
390
  if (pVnode->freeList) vnodeCloseBufPool(pVnode);
C
Cary Xu 已提交
391

H
Hongze Cheng 已提交
392 393 394
  tsem_destroy(&(pVnode->canCommit));
  taosMemoryFree(pVnode);
  return NULL;
H
refact  
Hongze Cheng 已提交
395 396
}

H
Hongze Cheng 已提交
397
void vnodePreClose(SVnode *pVnode) {
D
dapan1121 已提交
398
  vnodeQueryPreClose(pVnode);
H
Hongze Cheng 已提交
399
  vnodeSyncPreClose(pVnode);
D
dapan1121 已提交
400
}
401

402
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
403

H
Hongze Cheng 已提交
404
void vnodeClose(SVnode *pVnode) {
H
refact  
Hongze Cheng 已提交
405
  if (pVnode) {
406
    tsem_wait(&pVnode->canCommit);
M
Minghao Li 已提交
407
    vnodeSyncClose(pVnode);
D
dapan1121 已提交
408
    vnodeQueryClose(pVnode);
H
Hongze Cheng 已提交
409 410
    walClose(pVnode->pWal);
    tqClose(pVnode->pTq);
411
    if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
C
Cary Xu 已提交
412
    smaClose(pVnode->pSma);
413
    if (pVnode->pMeta) metaClose(&pVnode->pMeta);
H
Hongze Cheng 已提交
414
    vnodeCloseBufPool(pVnode);
B
Benguang Zhao 已提交
415 416
    tsem_post(&pVnode->canCommit);

H
Hongze Cheng 已提交
417 418
    // destroy handle
    tsem_destroy(&(pVnode->canCommit));
419
    tsem_destroy(&pVnode->syncSem);
H
Hongze Cheng 已提交
420 421
    taosThreadCondDestroy(&pVnode->poolNotEmpty);
    taosThreadMutexDestroy(&pVnode->mutex);
422
    taosThreadMutexDestroy(&pVnode->lock);
H
Hongze Cheng 已提交
423
    taosMemoryFree(pVnode);
H
refact  
Hongze Cheng 已提交
424
  }
L
Liu Jicong 已提交
425
}
H
Hongze Cheng 已提交
426

427
// start the sync timer after the queue is ready
428
int32_t vnodeStart(SVnode *pVnode) { return vnodeSyncStart(pVnode); }
429

C
cadem 已提交
430 431 432 433
int32_t vnodeIsCatchUp(SVnode *pVnode){
  return syncIsCatchUp(pVnode->sync);
}

C
cadem 已提交
434 435 436 437
ESyncRole vnodeGetRole(SVnode *pVnode){
  return syncGetRole(pVnode->sync);
}

438 439
void vnodeStop(SVnode *pVnode) {}

H
Hongze Cheng 已提交
440 441
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }

H
Hongze Cheng 已提交
442 443 444 445 446 447
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) {
  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = pVnode->state.committed;
  pSnapshot->lastApplyTerm = pVnode->state.commitTerm;
  pSnapshot->lastConfigIndex = -1;
}
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621

static void initTsdbReaderAPI(TsdReader* pReader);
static void initMetadataAPI(SStoreMeta* pMeta);
static void initTqAPI(SStoreTqReader* pTq);
static void initStateStoreAPI(SStateStore* pStore);
static void initMetaReaderAPI(SStoreMetaReader* pMetaReader);

void initStorageAPI(SStorageAPI* pAPI) {
  initTsdbReaderAPI(&pAPI->tsdReader);
  initMetadataAPI(&pAPI->metaFn);
  initTqAPI(&pAPI->tqReaderFn);
  initStateStoreAPI(&pAPI->stateStore);
  initMetaReaderAPI(&pAPI->metaReaderFn);
}

void initTsdbReaderAPI(TsdReader* pReader) {
  pReader->tsdReaderOpen = (__store_reader_open_fn_t)tsdbReaderOpen;
  pReader->tsdReaderClose = tsdbReaderClose;

  pReader->tsdNextDataBlock = tsdbNextDataBlock;

  pReader->tsdReaderRetrieveDataBlock = tsdbRetrieveDataBlock;
  pReader->tsdReaderReleaseDataBlock = tsdbReleaseDataBlock;

  pReader->tsdReaderRetrieveBlockSMAInfo = tsdbRetrieveDatablockSMA;

  pReader->tsdReaderNotifyClosing = tsdbReaderSetCloseFlag;
  pReader->tsdReaderResetStatus = tsdbReaderReset;

  pReader->tsdReaderGetDataBlockDistInfo = tsdbGetFileBlocksDistInfo;
  pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable;    // todo this function should be moved away

  pReader->tsdSetQueryTableList = tsdbSetTableList;
  pReader->tsdSetReaderTaskId = (void (*)(void *, const char *))tsdbReaderSetId;
}

void initMetadataAPI(SStoreMeta* pMeta) {
  pMeta->isTableExisted = metaIsTableExist;

  pMeta->openTableMetaCursor = metaOpenTbCursor;
  pMeta->closeTableMetaCursor = metaCloseTbCursor;
  pMeta->cursorNext = metaTbCursorNext;
  pMeta->cursorPrev = metaTbCursorPrev;

  pMeta->getBasicInfo = vnodeGetInfo;
  pMeta->getNumOfChildTables = metaGetStbStats;

  pMeta->getChildTableList = vnodeGetCtbIdList;

  pMeta->storeGetIndexInfo = vnodeGetIdx;
  pMeta->getInvertIndex = vnodeGetIvtIdx;

  pMeta->extractTagVal = (const void *(*)(const void *, int16_t, STagVal *))metaGetTableTagVal;

}

void initTqAPI(SStoreTqReader* pTq) {
  pTq->tqReaderOpen = tqReaderOpen;
  pTq->tqReaderSetColIdList = tqReaderSetColIdList;

  pTq->tqReaderClose = tqReaderClose;
  pTq->tqReaderSeek = tqReaderSeek;
  pTq->tqRetrieveBlock = tqRetrieveDataBlock;

  pTq->tqReaderNextBlockInWal = tqNextBlockInWal;

  pTq->tqNextBlockImpl = tqNextBlockImpl;// todo remove it

  pTq->tqReaderAddTables = tqReaderAddTbUidList;
  pTq->tqReaderSetQueryTableList = tqReaderSetTbUidList;

  pTq->tqReaderRemoveTables = tqReaderRemoveTbUidList;

  pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable;
  pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;

  pTq->tqReaderGetWalReader = tqGetWalReader;  // todo remove it
  pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock;          // todo remove it

  pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
  pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
}

void initStateStoreAPI(SStateStore* pStore) {
  pStore->streamFileStateInit = streamFileStateInit;
  pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;

  pStore->streamStateGetByPos = streamStateGetByPos;

  pStore->streamStatePutParName = streamStatePutParName;
  pStore->streamStateGetParName = streamStateGetParName;

  pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
  pStore->streamStateReleaseBuf = streamStateReleaseBuf;
  pStore->streamStateFreeVal = streamStateFreeVal;

  pStore->streamStatePut = streamStatePut;
  pStore->streamStateGet = streamStateGet;
  pStore->streamStateCheck = streamStateCheck;
  pStore->streamStateGetByPos = streamStateGetByPos;
  pStore->streamStateDel = streamStateDel;
  pStore->streamStateClear = streamStateClear;
  pStore->streamStateSaveInfo = streamStateSaveInfo;
  pStore->streamStateGetInfo = streamStateGetInfo;
  pStore->streamStateSetNumber = streamStateSetNumber;

  pStore->streamStateFillPut = streamStateFillPut;
  pStore->streamStateFillGet = streamStateFillGet;
  pStore->streamStateFillDel = streamStateFillDel;

  pStore->streamStateCurNext = streamStateCurNext;
  pStore->streamStateCurPrev = streamStateCurPrev;

  pStore->streamStateGetAndCheckCur = streamStateGetAndCheckCur;
  pStore->streamStateSeekKeyNext = streamStateSeekKeyNext;
  pStore->streamStateFillSeekKeyNext = streamStateFillSeekKeyNext;
  pStore->streamStateFillSeekKeyPrev = streamStateFillSeekKeyPrev;
  pStore->streamStateFreeCur = streamStateFreeCur;

  pStore->streamStateGetGroupKVByCur = streamStateGetGroupKVByCur;
  pStore->streamStateGetKVByCur = streamStateGetKVByCur;

  pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
  pStore->streamStateSessionPut = streamStateSessionPut;
  pStore->streamStateSessionGet = streamStateSessionGet;
  pStore->streamStateSessionDel = streamStateSessionDel;
  pStore->streamStateSessionClear = streamStateSessionClear;
  pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur;
  pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist;
  pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange;

  pStore->updateInfoInit = updateInfoInit;
  pStore->updateInfoFillBlockData = updateInfoFillBlockData;
  pStore->updateInfoIsUpdated = updateInfoIsUpdated;
  pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
  pStore->updateInfoDestroy = updateInfoDestroy;

  pStore->updateInfoInitP = updateInfoInitP;
  pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
  pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
  pStore->updateInfoSerialize = updateInfoSerialize;
  pStore->updateInfoDeserialize = updateInfoDeserialize;

  pStore->streamStateSessionSeekKeyNext = streamStateSessionSeekKeyNext;
  pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
  pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;

  pStore->streamFileStateInit = streamFileStateInit;

  pStore->streamFileStateDestroy = streamFileStateDestroy;
  pStore->streamFileStateClear = streamFileStateClear;
  pStore->needClearDiskBuff = needClearDiskBuff;

  pStore->streamStateOpen = streamStateOpen;
  pStore->streamStateClose = streamStateClose;
  pStore->streamStateBegin = streamStateBegin;
  pStore->streamStateCommit = streamStateCommit;
  pStore->streamStateDestroy= streamStateDestroy;
  pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
}

void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
  pMetaReader->initReader = _metaReaderInit;
  pMetaReader->clearReader = metaReaderClear;

  pMetaReader->getTableEntryByUid = metaReaderGetTableEntryByUid;
  pMetaReader->clearReader = metaReaderClear;

  pMetaReader->getEntryGetUidCache = metaReaderGetTableEntryByUidCache;
  pMetaReader->getTableEntryByName = metaGetTableEntryByName;

  pMetaReader->readerReleaseLock = metaReaderReleaseLock;
}