vnodeOpen.c 6.5 KB
Newer Older
H
save  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
save  
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
H
Hongze Cheng 已提交
19 20
  SVnodeInfo info = {0};
  char       dir[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24 25
  // TODO: check if directory exists

  // check config
  if (vnodeCheckCfg(pCfg) < 0) {
S
Shengliang Guan 已提交
26
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno));
H
Hongze Cheng 已提交
27 28 29 30
    return -1;
  }

  // create vnode env
H
Hongze Cheng 已提交
31
  if (tfsMkdirAt(pTfs, path, (SDiskID){0}) < 0) {
S
Shengliang Guan 已提交
32
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno));
H
Hongze Cheng 已提交
33 34 35
    return -1;
  }

H
Hongze Cheng 已提交
36
  snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
H
Hongze Cheng 已提交
37
  info.config = *pCfg;
M
Minghao Li 已提交
38 39
  info.state.committed = -1;
  info.state.applied = -1;
H
Hongze Cheng 已提交
40
  info.state.commitID = 0;
H
Hongze Cheng 已提交
41 42

  if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
S
Shengliang Guan 已提交
43
    vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno));
H
Hongze Cheng 已提交
44 45 46
    return -1;
  }

S
Shengliang Guan 已提交
47
  vInfo("vgId:%d, vnode is created", pCfg->vgId);
H
Hongze Cheng 已提交
48

H
Hongze Cheng 已提交
49 50 51
  return 0;
}

H
refact  
Hongze Cheng 已提交
52 53
void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); }

H
Hongze Cheng 已提交
54 55 56 57
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
  SVnode    *pVnode = NULL;
  SVnodeInfo info = {0};
  char       dir[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
58
  char       tdir[TSDB_FILENAME_LEN * 2];
H
Hongze Cheng 已提交
59
  int        ret;
H
more  
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61 62
  snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);

H
Hongze Cheng 已提交
63 64
  info.config = vnodeCfgDefault;

H
Hongze Cheng 已提交
65 66 67 68
  // load vnode info
  ret = vnodeLoadInfo(dir, &info);
  if (ret < 0) {
    vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
H
more  
Hongze Cheng 已提交
69 70 71
    return NULL;
  }

H
Hongze Cheng 已提交
72
  // create handle
H
Hongze Cheng 已提交
73
  pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
H
more  
Hongze Cheng 已提交
74
  if (pVnode == NULL) {
H
Hongze Cheng 已提交
75
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
76
    vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
H
more  
Hongze Cheng 已提交
77 78 79
    return NULL;
  }

H
Hongze Cheng 已提交
80 81
  pVnode->path = (char *)&pVnode[1];
  strcpy(pVnode->path, path);
H
Hongze Cheng 已提交
82
  pVnode->config = info.config;
H
Hongze Cheng 已提交
83
  pVnode->state.committed = info.state.committed;
H
Hongze Cheng 已提交
84
  pVnode->state.commitTerm = info.state.commitTerm;
H
Hongze Cheng 已提交
85
  pVnode->state.applied = info.state.committed;
H
Hongze Cheng 已提交
86
  pVnode->state.commitID = info.state.commitID;
H
Hongze Cheng 已提交
87
  pVnode->state.commitTerm = info.state.commitTerm;
H
Hongze Cheng 已提交
88 89
  pVnode->pTfs = pTfs;
  pVnode->msgCb = msgCb;
90
  taosThreadMutexInit(&pVnode->lock, NULL);
91
  pVnode->blocked = false;
H
Hongze Cheng 已提交
92

93
  tsem_init(&pVnode->syncSem, 0, 0);
H
Hongze Cheng 已提交
94
  tsem_init(&(pVnode->canCommit), 0, 1);
H
Hongze Cheng 已提交
95 96
  taosThreadMutexInit(&pVnode->mutex, NULL);
  taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
H
Hongze Cheng 已提交
97

H
Hongze Cheng 已提交
98
  // open buffer pool
H
Hongze Cheng 已提交
99
  if (vnodeOpenBufPool(pVnode) < 0) {
S
Shengliang Guan 已提交
100
    vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
101
    goto _err;
H
more  
Hongze Cheng 已提交
102 103
  }

H
Hongze Cheng 已提交
104
  // open meta
H
Hongze Cheng 已提交
105
  if (metaOpen(pVnode, &pVnode->pMeta) < 0) {
S
Shengliang Guan 已提交
106
    vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
107
    goto _err;
H
refact  
Hongze Cheng 已提交
108 109
  }

H
Hongze Cheng 已提交
110
  // open tsdb
C
Cary Xu 已提交
111
  if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) {
S
Shengliang Guan 已提交
112
    vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
113 114 115 116 117
    goto _err;
  }

  // open sma
  if (smaOpen(pVnode)) {
S
Shengliang Guan 已提交
118
    vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
119
    goto _err;
H
refact  
Hongze Cheng 已提交
120 121
  }

H
Hongze Cheng 已提交
122 123
  // open wal
  sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
wafwerar's avatar
wafwerar 已提交
124
  taosRealPath(tdir, NULL, sizeof(tdir));
125 126 127 128 129 130 131

// for test tsdb snapshot
#if 0
  pVnode->config.walCfg.segSize = 200;
  pVnode->config.walCfg.retentionSize = 2000;
#endif

H
Hongze Cheng 已提交
132
  pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
H
merge  
Hongze Cheng 已提交
133
  if (pVnode->pWal == NULL) {
S
Shengliang Guan 已提交
134
    vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
135
    goto _err;
H
merge  
Hongze Cheng 已提交
136
  }
H
refact  
Hongze Cheng 已提交
137

H
Hongze Cheng 已提交
138 139
  // open tq
  sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
wafwerar's avatar
wafwerar 已提交
140
  taosRealPath(tdir, NULL, sizeof(tdir));
L
Liu Jicong 已提交
141
  pVnode->pTq = tqOpen(tdir, pVnode);
L
Liu Jicong 已提交
142
  if (pVnode->pTq == NULL) {
S
Shengliang Guan 已提交
143
    vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
144
    goto _err;
L
Liu Jicong 已提交
145 146
  }

H
Hongze Cheng 已提交
147
  // open query
D
dapan1121 已提交
148
  if (vnodeQueryOpen(pVnode)) {
S
Shengliang Guan 已提交
149
    vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
150
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
151
    goto _err;
D
dapan1121 已提交
152 153
  }

H
Hongze Cheng 已提交
154 155
  // vnode begin
  if (vnodeBegin(pVnode) < 0) {
S
Shengliang Guan 已提交
156
    vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
157
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
158 159 160
    goto _err;
  }

161 162
  // open sync
  if (vnodeSyncOpen(pVnode, dir)) {
S
Shengliang Guan 已提交
163
    vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
164 165 166 167 168 169 170 171 172
    goto _err;
  }

  return pVnode;

_err:
  if (pVnode->pQuery) vnodeQueryClose(pVnode);
  if (pVnode->pTq) tqClose(pVnode->pTq);
  if (pVnode->pWal) walClose(pVnode->pWal);
173
  if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
C
Cary Xu 已提交
174
  if (pVnode->pSma) smaClose(pVnode->pSma);
H
Hongze Cheng 已提交
175
  if (pVnode->pMeta) metaClose(pVnode->pMeta);
176
  if (pVnode->pPool) vnodeCloseBufPool(pVnode);
C
Cary Xu 已提交
177

H
Hongze Cheng 已提交
178 179 180
  tsem_destroy(&(pVnode->canCommit));
  taosMemoryFree(pVnode);
  return NULL;
H
refact  
Hongze Cheng 已提交
181 182
}

183 184 185 186 187 188
void vnodePreClose(SVnode *pVnode) {
  if (pVnode) {
    syncLeaderTransfer(pVnode->sync);
  }
}

H
Hongze Cheng 已提交
189
void vnodeClose(SVnode *pVnode) {
H
refact  
Hongze Cheng 已提交
190
  if (pVnode) {
H
Hongze Cheng 已提交
191
    vnodeCommit(pVnode);
M
Minghao Li 已提交
192
    vnodeSyncClose(pVnode);
D
dapan1121 已提交
193
    vnodeQueryClose(pVnode);
H
Hongze Cheng 已提交
194 195
    walClose(pVnode->pWal);
    tqClose(pVnode->pTq);
196
    if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
C
Cary Xu 已提交
197
    smaClose(pVnode->pSma);
H
Hongze Cheng 已提交
198 199 200 201
    metaClose(pVnode->pMeta);
    vnodeCloseBufPool(pVnode);
    // destroy handle
    tsem_destroy(&(pVnode->canCommit));
202
    tsem_destroy(&pVnode->syncSem);
H
Hongze Cheng 已提交
203 204
    taosThreadCondDestroy(&pVnode->poolNotEmpty);
    taosThreadMutexDestroy(&pVnode->mutex);
205
    taosThreadMutexDestroy(&pVnode->lock);
H
Hongze Cheng 已提交
206
    taosMemoryFree(pVnode);
H
refact  
Hongze Cheng 已提交
207
  }
L
Liu Jicong 已提交
208
}
H
Hongze Cheng 已提交
209

210 211 212 213 214 215 216 217
// start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) {
  vnodeSyncStart(pVnode);
  return 0;
}

void vnodeStop(SVnode *pVnode) {}

H
Hongze Cheng 已提交
218 219
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }

H
Hongze Cheng 已提交
220 221 222 223 224 225
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) {
  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = pVnode->state.committed;
  pSnapshot->lastApplyTerm = pVnode->state.commitTerm;
  pSnapshot->lastConfigIndex = -1;
}