vnodeSnapshot.c 9.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// SVSnapReader ========================================================
H
Hongze Cheng 已提交
19
struct SVSnapReader {
H
Hongze Cheng 已提交
20 21 22
  SVnode *pVnode;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
23
  int64_t index;
H
Hongze Cheng 已提交
24 25
  // meta
  int8_t           metaDone;
H
Hongze Cheng 已提交
26
  SMetaSnapReader *pMetaReader;
H
Hongze Cheng 已提交
27 28
  // tsdb
  int8_t           tsdbDone;
H
Hongze Cheng 已提交
29
  STsdbSnapReader *pTsdbReader;
C
Cary Xu 已提交
30
  // rsma
C
Cary Xu 已提交
31 32
  int8_t           rsmaDone;
  SRsmaSnapReader *pRsmaReader;
H
Hongze Cheng 已提交
33 34
};

H
Hongze Cheng 已提交
35
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
H
Hongze Cheng 已提交
36
  int32_t       code = 0;
H
Hongze Cheng 已提交
37
  SVSnapReader *pReader = NULL;
H
Hongze Cheng 已提交
38

H
Hongze Cheng 已提交
39
  pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
40 41 42 43 44 45 46 47
  if (pReader == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pReader->pVnode = pVnode;
  pReader->sver = sver;
  pReader->ever = ever;

H
Hongze Cheng 已提交
48
  vInfo("vgId:%d vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
H
Hongze Cheng 已提交
49
  *ppReader = pReader;
H
Hongze Cheng 已提交
50
  return code;
H
Hongze Cheng 已提交
51 52

_err:
H
Hongze Cheng 已提交
53
  vError("vgId:%d vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
H
Hongze Cheng 已提交
54
  *ppReader = NULL;
H
Hongze Cheng 已提交
55
  return code;
H
Hongze Cheng 已提交
56 57
}

H
Hongze Cheng 已提交
58
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
H
Hongze Cheng 已提交
59 60
  int32_t code = 0;

C
Cary Xu 已提交
61 62 63 64
  if (pReader->pRsmaReader) {
    rsmaSnapReaderClose(&pReader->pRsmaReader);
  }

H
Hongze Cheng 已提交
65 66 67 68 69 70 71
  if (pReader->pTsdbReader) {
    tsdbSnapReaderClose(&pReader->pTsdbReader);
  }

  if (pReader->pMetaReader) {
    metaSnapReaderClose(&pReader->pMetaReader);
  }
H
Hongze Cheng 已提交
72

H
Hongze Cheng 已提交
73
  vInfo("vgId:%d vnode snapshot reader closed", TD_VID(pReader->pVnode));
H
Hongze Cheng 已提交
74
  taosMemoryFree(pReader);
H
Hongze Cheng 已提交
75
  return code;
H
Hongze Cheng 已提交
76 77
}

H
Hongze Cheng 已提交
78
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
H
Hongze Cheng 已提交
79 80
  int32_t code = 0;

H
Hongze Cheng 已提交
81
  // META ==============
H
Hongze Cheng 已提交
82
  if (!pReader->metaDone) {
H
Hongze Cheng 已提交
83 84 85 86 87 88 89
    // open reader if not
    if (pReader->pMetaReader == NULL) {
      code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
      if (code) goto _err;
    }

    code = metaSnapRead(pReader->pMetaReader, ppData);
H
Hongze Cheng 已提交
90
    if (code) {
H
Hongze Cheng 已提交
91 92 93 94
      goto _err;
    } else {
      if (*ppData) {
        goto _exit;
H
Hongze Cheng 已提交
95
      } else {
H
Hongze Cheng 已提交
96 97 98
        pReader->metaDone = 1;
        code = metaSnapReaderClose(&pReader->pMetaReader);
        if (code) goto _err;
H
Hongze Cheng 已提交
99 100 101 102
      }
    }
  }

H
Hongze Cheng 已提交
103
  // TSDB ==============
H
Hongze Cheng 已提交
104
  if (!pReader->tsdbDone) {
H
Hongze Cheng 已提交
105
    // open if not
H
Hongze Cheng 已提交
106
    if (pReader->pTsdbReader == NULL) {
C
Cary Xu 已提交
107
      code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, &pReader->pTsdbReader);
H
Hongze Cheng 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
      if (code) goto _err;
    }

    code = tsdbSnapRead(pReader->pTsdbReader, ppData);
    if (code) {
      goto _err;
    } else {
      if (*ppData) {
        goto _exit;
      } else {
        pReader->tsdbDone = 1;
        code = tsdbSnapReaderClose(&pReader->pTsdbReader);
        if (code) goto _err;
      }
    }
H
Hongze Cheng 已提交
123 124
  }

C
Cary Xu 已提交
125
  // RSMA ==============
C
Cary Xu 已提交
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
  if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
    // open if not
    if (pReader->pRsmaReader == NULL) {
      code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
      if (code) goto _err;
    }

    code = rsmaSnapRead(pReader->pRsmaReader, ppData);
    if (code) {
      goto _err;
    } else {
      if (*ppData) {
        goto _exit;
      } else {
        pReader->tsdbDone = 1;
        code = rsmaSnapReaderClose(&pReader->pRsmaReader);
        if (code) goto _err;
C
Cary Xu 已提交
143 144 145 146
      }
    }
  }

H
Hongze Cheng 已提交
147 148
  *ppData = NULL;
  *nData = 0;
H
Hongze Cheng 已提交
149 150

_exit:
H
Hongze Cheng 已提交
151
  if (*ppData) {
H
Hongze Cheng 已提交
152 153
    SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);

H
Hongze Cheng 已提交
154
    pReader->index++;
H
Hongze Cheng 已提交
155 156
    *nData = sizeof(SSnapDataHdr) + pHdr->size;
    pHdr->index = pReader->index;
H
Hongze Cheng 已提交
157 158
    vInfo("vgId:%d vnode snapshot read data,index:%" PRId64 " type:%d nData:%d ", TD_VID(pReader->pVnode),
          pReader->index, pHdr->type, *nData);
H
Hongze Cheng 已提交
159 160 161
  } else {
    vInfo("vgId:%d vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
  }
H
Hongze Cheng 已提交
162 163 164
  return code;

_err:
H
Hongze Cheng 已提交
165
  vError("vgId:% vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
166
  return code;
H
more  
Hongze Cheng 已提交
167 168
}

H
Hongze Cheng 已提交
169
// SVSnapWriter ========================================================
H
Hongze Cheng 已提交
170 171 172 173
struct SVSnapWriter {
  SVnode *pVnode;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
174
  int64_t commitID;
H
Hongze Cheng 已提交
175
  int64_t index;
H
Hongze Cheng 已提交
176 177 178 179
  // meta
  SMetaSnapWriter *pMetaSnapWriter;
  // tsdb
  STsdbSnapWriter *pTsdbSnapWriter;
C
Cary Xu 已提交
180 181
  // rsma
  SRsmaSnapWriter *pRsmaSnapWriter;
H
Hongze Cheng 已提交
182 183
};

H
Hongze Cheng 已提交
184
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
H
Hongze Cheng 已提交
185 186
  int32_t       code = 0;
  SVSnapWriter *pWriter = NULL;
H
more  
Hongze Cheng 已提交
187 188

  // alloc
H
Hongze Cheng 已提交
189
  pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
H
more  
Hongze Cheng 已提交
190 191 192 193 194 195 196
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->pVnode = pVnode;
  pWriter->sver = sver;
  pWriter->ever = ever;
H
Hongze Cheng 已提交
197 198 199 200 201 202 203

  // commit it
  code = vnodeCommit(pVnode);
  if (code) goto _err;

  // inc commit ID
  pVnode->state.commitID++;
H
Hongze Cheng 已提交
204
  pWriter->commitID = pVnode->state.commitID;
H
more  
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206 207
  vInfo("vgId:%d vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
        sver, ever, pWriter->commitID);
H
Hongze Cheng 已提交
208
  *ppWriter = pWriter;
H
more  
Hongze Cheng 已提交
209 210 211
  return code;

_err:
H
Hongze Cheng 已提交
212
  vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
H
Hongze Cheng 已提交
213
  *ppWriter = NULL;
H
more  
Hongze Cheng 已提交
214 215 216
  return code;
}

217
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
H
more  
Hongze Cheng 已提交
218
  int32_t code = 0;
H
Hongze Cheng 已提交
219
  SVnode *pVnode = pWriter->pVnode;
H
more  
Hongze Cheng 已提交
220

H
Hongze Cheng 已提交
221 222 223 224
  if (pWriter->pMetaSnapWriter) {
    code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226 227 228 229
  if (pWriter->pTsdbSnapWriter) {
    code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
    if (code) goto _err;
  }
H
more  
Hongze Cheng 已提交
230

C
Cary Xu 已提交
231 232 233 234 235
  if (pWriter->pRsmaSnapWriter) {
    code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
236 237 238 239 240 241
  if (!rollback) {
    SVnodeInfo info = {0};
    char       dir[TSDB_FILENAME_LEN];

    pVnode->state.committed = pWriter->ever;
    pVnode->state.applied = pWriter->ever;
242 243
    pVnode->state.applyTerm = pSnapshot->lastApplyTerm;
    pVnode->state.commitTerm = pSnapshot->lastApplyTerm;
H
Hongze Cheng 已提交
244 245 246 247 248 249 250 251 252 253 254

    info.config = pVnode->config;
    info.state.committed = pVnode->state.applied;
    info.state.commitTerm = pVnode->state.applyTerm;
    info.state.commitID = pVnode->state.commitID;
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
    code = vnodeSaveInfo(dir, &info);
    if (code) goto _err;

    code = vnodeCommitInfo(dir, &info);
    if (code) goto _err;
H
Hongze Cheng 已提交
255 256

    vnodeBegin(pVnode);
H
Hongze Cheng 已提交
257 258 259 260
  } else {
    ASSERT(0);
  }

H
Hongze Cheng 已提交
261
_exit:
H
Hongze Cheng 已提交
262
  vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
H
more  
Hongze Cheng 已提交
263 264
  taosMemoryFree(pWriter);
  return code;
H
Hongze Cheng 已提交
265 266

_err:
H
Hongze Cheng 已提交
267
  vError("vgId:%d vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
268
  return code;
H
Hongze Cheng 已提交
269 270
}

H
Hongze Cheng 已提交
271
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
H
Hongze Cheng 已提交
272
  int32_t       code = 0;
H
Hongze Cheng 已提交
273
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
H
Hongze Cheng 已提交
274 275
  SVnode       *pVnode = pWriter->pVnode;

H
Hongze Cheng 已提交
276 277 278
  ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
  ASSERT(pHdr->index == pWriter->index + 1);
  pWriter->index = pHdr->index;
H
Hongze Cheng 已提交
279

H
Hongze Cheng 已提交
280 281
  vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
        pHdr->type, nData);
H
Hongze Cheng 已提交
282

C
Cary Xu 已提交
283 284 285 286 287 288 289
  switch (pHdr->type) {
    case SNAP_DATA_META: {
      // meta
      if (pWriter->pMetaSnapWriter == NULL) {
        code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
        if (code) goto _err;
      }
H
Hongze Cheng 已提交
290

C
Cary Xu 已提交
291
      code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
H
Hongze Cheng 已提交
292
      if (code) goto _err;
C
Cary Xu 已提交
293 294 295 296 297 298 299
    } break;
    case SNAP_DATA_TSDB: {
      // tsdb
      if (pWriter->pTsdbSnapWriter == NULL) {
        code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
        if (code) goto _err;
      }
H
Hongze Cheng 已提交
300

C
Cary Xu 已提交
301 302 303 304 305 306 307 308 309 310
      code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
      if (code) goto _err;
    } break;
    case SNAP_DATA_RSMA1:
    case SNAP_DATA_RSMA2: {
      // rsma1/rsma2
      if (pWriter->pRsmaSnapWriter == NULL) {
        code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
        if (code) goto _err;
      }
H
Hongze Cheng 已提交
311

C
Cary Xu 已提交
312
      code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
H
Hongze Cheng 已提交
313
      if (code) goto _err;
C
Cary Xu 已提交
314 315 316 317 318 319 320
    } break;
    case SNAP_DATA_QTASK: {
      // qtask for rsma
      if (pWriter->pRsmaSnapWriter == NULL) {
        code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
        if (code) goto _err;
      }
H
Hongze Cheng 已提交
321

C
Cary Xu 已提交
322 323 324 325 326
      code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
      if (code) goto _err;
    } break;
    default:
      break;
H
Hongze Cheng 已提交
327
  }
H
Hongze Cheng 已提交
328
_exit:
H
Hongze Cheng 已提交
329 330 331
  return code;

_err:
H
Hongze Cheng 已提交
332 333
  vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
         tstrerror(code), pHdr->index, pHdr->type, nData);
H
Hongze Cheng 已提交
334
  return code;
H
Hongze Cheng 已提交
335
}