vnodeOpen.c 8.6 KB
Newer Older
H
save  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
save  
Hongze Cheng 已提交
17

S
Shengliang Guan 已提交
18
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
H
Hongze Cheng 已提交
19
  SVnodeInfo info = {0};
S
Shengliang Guan 已提交
20
  char       dir[TSDB_FILENAME_LEN] = {0};
H
Hongze Cheng 已提交
21 22 23

  // check config
  if (vnodeCheckCfg(pCfg) < 0) {
S
Shengliang Guan 已提交
24
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno));
H
Hongze Cheng 已提交
25 26 27 28
    return -1;
  }

  // create vnode env
H
Hongze Cheng 已提交
29 30 31 32 33 34 35 36 37 38
  if (pTfs) {
    if (tfsMkdirAt(pTfs, path, (SDiskID){0}) < 0) {
      vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(terrno));
      return -1;
    }
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
  } else {
    if (taosMkDir(path)) {
      return TAOS_SYSTEM_ERROR(errno);
    }
H
Hongze Cheng 已提交
39
    snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
H
Hongze Cheng 已提交
40 41
  }

H
Hongze Cheng 已提交
42 43 44 45 46
  if (pCfg) {
    info.config = *pCfg;
  } else {
    info.config = vnodeCfgDefault;
  }
M
Minghao Li 已提交
47 48
  info.state.committed = -1;
  info.state.applied = -1;
H
Hongze Cheng 已提交
49
  info.state.commitID = 0;
H
Hongze Cheng 已提交
50 51

  if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
H
Hongze Cheng 已提交
52
    vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno));
H
Hongze Cheng 已提交
53 54 55
    return -1;
  }

H
Hongze Cheng 已提交
56
  vInfo("vgId:%d, vnode is created", info.config.vgId);
S
Shengliang Guan 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69
  return 0;
}

int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
  SVnodeInfo info = {0};
  char       dir[TSDB_FILENAME_LEN] = {0};
  int32_t    ret = 0;

  if (pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
  }
H
Hongze Cheng 已提交
70

S
Shengliang Guan 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84
  ret = vnodeLoadInfo(dir, &info);
  if (ret < 0) {
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
    return -1;
  }

  SSyncCfg *pCfg = &info.config.syncCfg;
  pCfg->myIndex = pReq->selfIndex;
  pCfg->replicaNum = pReq->replica;
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));

  vInfo("vgId:%d, save config, replicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int i = 0; i < pReq->replica; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
85
    pNode->nodeId = pReq->replicas[i].id;
S
Shengliang Guan 已提交
86 87
    pNode->nodePort = pReq->replicas[i].port;
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
88
    (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
S
Shengliang Guan 已提交
89 90 91
    vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

S
Shengliang Guan 已提交
92
  info.config.syncCfg = *pCfg;
S
Shengliang Guan 已提交
93 94 95 96 97 98
  ret = vnodeSaveInfo(dir, &info);
  if (ret < 0) {
    vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
    return -1;
  }

S
Shengliang Guan 已提交
99 100 101 102 103 104
  ret = vnodeCommitInfo(dir, &info);
  if (ret < 0) {
    vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
    return -1;
  }

S
Shengliang Guan 已提交
105
  vInfo("vgId:%d, vnode config is saved", info.config.vgId);
H
Hongze Cheng 已提交
106 107 108
  return 0;
}

S
Shengliang Guan 已提交
109 110 111 112
void vnodeDestroy(const char *path, STfs *pTfs) {
  vInfo("path:%s is removed while destroy vnode", path);
  tfsRmdir(pTfs, path);
}
H
refact  
Hongze Cheng 已提交
113

H
Hongze Cheng 已提交
114 115 116
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
  SVnode    *pVnode = NULL;
  SVnodeInfo info = {0};
S
Shengliang Guan 已提交
117 118 119
  char       dir[TSDB_FILENAME_LEN] = {0};
  char       tdir[TSDB_FILENAME_LEN * 2] = {0};
  int32_t    ret = 0;
H
more  
Hongze Cheng 已提交
120

H
Hongze Cheng 已提交
121 122 123 124 125
  if (pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
  }
H
Hongze Cheng 已提交
126

H
Hongze Cheng 已提交
127 128
  info.config = vnodeCfgDefault;

H
Hongze Cheng 已提交
129 130 131 132
  // load vnode info
  ret = vnodeLoadInfo(dir, &info);
  if (ret < 0) {
    vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
H
more  
Hongze Cheng 已提交
133 134 135
    return NULL;
  }

H
Hongze Cheng 已提交
136
  // create handle
S
Shengliang Guan 已提交
137
  pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
H
more  
Hongze Cheng 已提交
138
  if (pVnode == NULL) {
H
Hongze Cheng 已提交
139
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
140
    vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
H
more  
Hongze Cheng 已提交
141 142 143
    return NULL;
  }

H
Hongze Cheng 已提交
144 145
  pVnode->path = (char *)&pVnode[1];
  strcpy(pVnode->path, path);
H
Hongze Cheng 已提交
146
  pVnode->config = info.config;
H
Hongze Cheng 已提交
147
  pVnode->state.committed = info.state.committed;
H
Hongze Cheng 已提交
148
  pVnode->state.commitTerm = info.state.commitTerm;
H
Hongze Cheng 已提交
149
  pVnode->state.commitID = info.state.commitID;
150 151
  pVnode->state.applied = info.state.committed;
  pVnode->state.applyTerm = info.state.commitTerm;
H
Hongze Cheng 已提交
152 153
  pVnode->pTfs = pTfs;
  pVnode->msgCb = msgCb;
154
  taosThreadMutexInit(&pVnode->lock, NULL);
155
  pVnode->blocked = false;
H
Hongze Cheng 已提交
156

157
  tsem_init(&pVnode->syncSem, 0, 0);
H
Hongze Cheng 已提交
158
  tsem_init(&(pVnode->canCommit), 0, 1);
H
Hongze Cheng 已提交
159 160
  taosThreadMutexInit(&pVnode->mutex, NULL);
  taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
H
Hongze Cheng 已提交
161

H
Hongze Cheng 已提交
162 163
  int8_t rollback = vnodeShouldRollback(pVnode);

H
Hongze Cheng 已提交
164
  // open buffer pool
H
Hongze Cheng 已提交
165
  if (vnodeOpenBufPool(pVnode) < 0) {
S
Shengliang Guan 已提交
166
    vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
167
    goto _err;
H
more  
Hongze Cheng 已提交
168 169
  }

H
Hongze Cheng 已提交
170
  // open meta
H
Hongze Cheng 已提交
171
  if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
S
Shengliang Guan 已提交
172
    vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
173
    goto _err;
H
refact  
Hongze Cheng 已提交
174 175
  }

H
Hongze Cheng 已提交
176
  // open tsdb
H
Hongze Cheng 已提交
177
  if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
S
Shengliang Guan 已提交
178
    vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
179 180 181 182
    goto _err;
  }

  // open sma
H
Hongze Cheng 已提交
183
  if (smaOpen(pVnode, rollback)) {
S
Shengliang Guan 已提交
184
    vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
185
    goto _err;
H
refact  
Hongze Cheng 已提交
186 187
  }

H
Hongze Cheng 已提交
188 189
  // open wal
  sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
wafwerar's avatar
wafwerar 已提交
190
  taosRealPath(tdir, NULL, sizeof(tdir));
191

H
Hongze Cheng 已提交
192
  pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
H
merge  
Hongze Cheng 已提交
193
  if (pVnode->pWal == NULL) {
194
    vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
H
Hongze Cheng 已提交
195
    goto _err;
H
merge  
Hongze Cheng 已提交
196
  }
H
refact  
Hongze Cheng 已提交
197

H
Hongze Cheng 已提交
198 199
  // open tq
  sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
wafwerar's avatar
wafwerar 已提交
200
  taosRealPath(tdir, NULL, sizeof(tdir));
L
Liu Jicong 已提交
201
  pVnode->pTq = tqOpen(tdir, pVnode);
L
Liu Jicong 已提交
202
  if (pVnode->pTq == NULL) {
S
Shengliang Guan 已提交
203
    vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
204
    goto _err;
L
Liu Jicong 已提交
205 206
  }

H
Hongze Cheng 已提交
207
  // open query
D
dapan1121 已提交
208
  if (vnodeQueryOpen(pVnode)) {
S
Shengliang Guan 已提交
209
    vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
210
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
211
    goto _err;
D
dapan1121 已提交
212 213
  }

H
Hongze Cheng 已提交
214 215
  // vnode begin
  if (vnodeBegin(pVnode) < 0) {
S
Shengliang Guan 已提交
216
    vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
217
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
218 219 220
    goto _err;
  }

221 222
  // open sync
  if (vnodeSyncOpen(pVnode, dir)) {
S
Shengliang Guan 已提交
223
    vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
224 225
    goto _err;
  }
H
Hongze Cheng 已提交
226 227 228 229

  if (rollback) {
    vnodeRollback(pVnode);
  }
H
Hongze Cheng 已提交
230 231 232 233 234 235 236

  return pVnode;

_err:
  if (pVnode->pQuery) vnodeQueryClose(pVnode);
  if (pVnode->pTq) tqClose(pVnode->pTq);
  if (pVnode->pWal) walClose(pVnode->pWal);
237
  if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
C
Cary Xu 已提交
238
  if (pVnode->pSma) smaClose(pVnode->pSma);
H
Hongze Cheng 已提交
239
  if (pVnode->pMeta) metaClose(pVnode->pMeta);
240
  if (pVnode->pPool) vnodeCloseBufPool(pVnode);
C
Cary Xu 已提交
241

H
Hongze Cheng 已提交
242 243 244
  tsem_destroy(&(pVnode->canCommit));
  taosMemoryFree(pVnode);
  return NULL;
H
refact  
Hongze Cheng 已提交
245 246
}

H
Hongze Cheng 已提交
247
void vnodePreClose(SVnode *pVnode) {
D
dapan1121 已提交
248
  vnodeQueryPreClose(pVnode);
H
Hongze Cheng 已提交
249
  vnodeSyncPreClose(pVnode);
D
dapan1121 已提交
250
}
251

H
Hongze Cheng 已提交
252
void vnodeClose(SVnode *pVnode) {
H
refact  
Hongze Cheng 已提交
253
  if (pVnode) {
H
Hongze Cheng 已提交
254
    vnodeSyncCommit(pVnode);
M
Minghao Li 已提交
255
    vnodeSyncClose(pVnode);
D
dapan1121 已提交
256
    vnodeQueryClose(pVnode);
H
Hongze Cheng 已提交
257 258
    walClose(pVnode->pWal);
    tqClose(pVnode->pTq);
259
    if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
C
Cary Xu 已提交
260
    smaClose(pVnode->pSma);
H
Hongze Cheng 已提交
261 262 263 264
    metaClose(pVnode->pMeta);
    vnodeCloseBufPool(pVnode);
    // destroy handle
    tsem_destroy(&(pVnode->canCommit));
265
    tsem_destroy(&pVnode->syncSem);
H
Hongze Cheng 已提交
266 267
    taosThreadCondDestroy(&pVnode->poolNotEmpty);
    taosThreadMutexDestroy(&pVnode->mutex);
268
    taosThreadMutexDestroy(&pVnode->lock);
H
Hongze Cheng 已提交
269
    taosMemoryFree(pVnode);
H
refact  
Hongze Cheng 已提交
270
  }
L
Liu Jicong 已提交
271
}
H
Hongze Cheng 已提交
272

273
// start the sync timer after the queue is ready
274
int32_t vnodeStart(SVnode *pVnode) { return vnodeSyncStart(pVnode); }
275 276 277

void vnodeStop(SVnode *pVnode) {}

H
Hongze Cheng 已提交
278 279
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }

H
Hongze Cheng 已提交
280 281 282 283 284 285
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) {
  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = pVnode->state.committed;
  pSnapshot->lastApplyTerm = pVnode->state.commitTerm;
  pSnapshot->lastConfigIndex = -1;
}