smaSnapshot.c 13.9 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

18 19
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
C
Cary Xu 已提交
20

21 22
// SRSmaSnapReader ========================================
struct SRSmaSnapReader {
C
Cary Xu 已提交
23 24 25
  SSma*   pSma;
  int64_t sver;
  int64_t ever;
K
kailixu 已提交
26
  SRSmaFS fs;
C
Cary Xu 已提交
27 28 29 30 31 32 33

  // for data file
  int8_t           rsmaDataDone[TSDB_RETENTION_L2];
  STsdbSnapReader* pDataReader[TSDB_RETENTION_L2];

  // for qtaskinfo file
  int8_t         qTaskDone;
K
kailixu 已提交
34
  int32_t        fsIter;
C
Cary Xu 已提交
35 36 37
  SQTaskFReader* pQTaskFReader;
};

38
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) {
C
Cary Xu 已提交
39
  int32_t          code = 0;
K
kailixu 已提交
40
  int32_t          lino = 0;
C
Cary Xu 已提交
41
  SVnode*          pVnode = pSma->pVnode;
42
  SRSmaSnapReader* pReader = NULL;
K
kailixu 已提交
43 44
  SSmaEnv*         pEnv = SMA_RSMA_ENV(pSma);
  SRSmaStat*       pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
45 46

  // alloc
47
  pReader = (SRSmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
C
Cary Xu 已提交
48 49
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
K
kailixu 已提交
50
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
51 52 53 54 55
  }
  pReader->pSma = pSma;
  pReader->sver = sver;
  pReader->ever = ever;

56
  // open rsma1/rsma2
C
Cary Xu 已提交
57 58
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
    if (pSma->pRSmaTsdb[i]) {
C
Cary Xu 已提交
59 60
      code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
                                &pReader->pDataReader[i]);
K
kailixu 已提交
61
      TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
62 63
    }
  }
C
Cary Xu 已提交
64

65
  // open qtaskinfo
K
kailixu 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78
  taosRLockLatch(RSMA_FS_LOCK(pStat));
  code = tdRSmaFSRef(pSma, &pReader->fs);
  taosRUnLockLatch(RSMA_FS_LOCK(pStat));
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) {
    pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
    if (!pReader->pQTaskFReader) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
    pReader->pQTaskFReader->pSma = pSma;
    pReader->pQTaskFReader->version = pReader->ever;
79 80 81
  }

  *ppReader = pReader;
K
kailixu 已提交
82 83 84 85 86
_exit:
  if (code) {
    if (pReader) rsmaSnapReaderClose(&pReader);
    *ppReader = NULL;
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
87
  }
K
kailixu 已提交
88
  return code;
C
Cary Xu 已提交
89 90
}

91
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) {
C
Cary Xu 已提交
92
  int32_t        code = 0;
K
kailixu 已提交
93 94 95 96
  int32_t        lino = 0;
  SVnode*        pVnode = pReader->pSma->pVnode;
  SQTaskFReader* qReader = pReader->pQTaskFReader;
  SRSmaFS*       pFS = &pReader->fs;
C
Cary Xu 已提交
97 98
  int64_t        n = 0;
  uint8_t*       pBuf = NULL;
K
kailixu 已提交
99 100
  int64_t        version = pReader->ever;
  char           fname[TSDB_FILENAME_LEN];
C
Cary Xu 已提交
101

102 103
  if (!qReader) {
    *ppBuf = NULL;
K
kailixu 已提交
104
    smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", TD_VID(pVnode));
105 106 107
    return 0;
  }

K
kailixu 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
  if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) {
    *ppBuf = NULL;
    smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode));
    return 0;
  }

  while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) {
    SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter);
    if (qTaskF->version != version) {
      continue;
    }

    tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs),
                               fname);
    if (!taosCheckExistFile(fname)) {
      smaWarn("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8 ", version %" PRIi64
              " is not needed as %s not exist",
              TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname);
      continue;
    }

    TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
    if (!fp) {
      code = TAOS_SYSTEM_ERROR(errno);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
    qReader->pReadH = fp;
    qReader->level = qTaskF->level;
    qReader->suid = qTaskF->suid;
  }

C
Cary Xu 已提交
139 140
  if (!qReader->pReadH) {
    *ppBuf = NULL;
K
kailixu 已提交
141
    smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode));
C
Cary Xu 已提交
142 143 144 145 146 147
    return 0;
  }

  int64_t size = 0;
  if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
K
kailixu 已提交
148
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
149 150 151 152 153
  }

  // seek
  if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
K
kailixu 已提交
154
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
155 156
  }

K
kailixu 已提交
157 158 159 160 161
  if (*ppBuf) {
    *ppBuf = taosMemoryRealloc(*ppBuf, sizeof(SSnapDataHdr) + size);
  } else {
    *ppBuf = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
  }
C
Cary Xu 已提交
162 163
  if (!(*ppBuf)) {
    code = TSDB_CODE_OUT_OF_MEMORY;
K
kailixu 已提交
164
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
165 166 167 168 169 170
  }

  // read
  n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
K
kailixu 已提交
171
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
172 173
  } else if (n != size) {
    code = TSDB_CODE_FILE_CORRUPTED;
K
kailixu 已提交
174
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
175 176
  }

K
kailixu 已提交
177 178
  smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, version:%" PRIi64 ", size:%" PRIi64, TD_VID(pVnode), version,
          size);
C
Cary Xu 已提交
179 180 181

  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
  pHdr->type = SNAP_DATA_QTASK;
K
kailixu 已提交
182 183
  pHdr->flag = qReader->level;
  pHdr->index = qReader->suid;
C
Cary Xu 已提交
184
  pHdr->size = size;
C
Cary Xu 已提交
185 186

_exit:
K
kailixu 已提交
187 188 189 190 191 192
  if (code) {
    *ppBuf = NULL;
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", TD_VID(pVnode));
  }
C
Cary Xu 已提交
193 194 195
  return code;
}

196
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
C
Cary Xu 已提交
197
  int32_t code = 0;
K
kailixu 已提交
198
  int32_t lino = 0;
C
Cary Xu 已提交
199 200 201

  *ppData = NULL;

S
Shengliang Guan 已提交
202
  smaInfo("vgId:%d, vnode snapshot rsma read entry", SMA_VID(pReader->pSma));
C
Cary Xu 已提交
203 204 205 206 207 208 209
  // read rsma1/rsma2 file
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
    STsdbSnapReader* pTsdbSnapReader = pReader->pDataReader[i];
    if (!pTsdbSnapReader) {
      continue;
    }
    if (!pReader->rsmaDataDone[i]) {
S
Shengliang Guan 已提交
210
      smaInfo("vgId:%d, vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i);
C
Cary Xu 已提交
211
      code = tsdbSnapRead(pTsdbSnapReader, ppData);
K
kailixu 已提交
212 213 214
      TSDB_CHECK_CODE(code, lino, _exit);
      if (*ppData) {
        goto _exit;
C
Cary Xu 已提交
215
      } else {
K
kailixu 已提交
216
        pReader->rsmaDataDone[i] = 1;
C
Cary Xu 已提交
217 218
      }
    } else {
S
Shengliang Guan 已提交
219
      smaInfo("vgId:%d, vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i);
C
Cary Xu 已提交
220 221 222 223 224
    }
  }

  // read qtaskinfo file
  if (!pReader->qTaskDone) {
C
Cary Xu 已提交
225
    smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma));
C
Cary Xu 已提交
226
    code = rsmaSnapReadQTaskInfo(pReader, ppData);
K
kailixu 已提交
227 228 229
    TSDB_CHECK_CODE(code, lino, _exit);
    if (*ppData) {
      goto _exit;
C
Cary Xu 已提交
230
    } else {
C
Cary Xu 已提交
231
      pReader->qTaskDone = 1;
C
Cary Xu 已提交
232 233 234 235
    }
  }

_exit:
K
kailixu 已提交
236 237 238 239 240
  if (code) {
    smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
  } else {
    smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
  }
C
Cary Xu 已提交
241 242 243
  return code;
}

244
int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
C
Cary Xu 已提交
245
  int32_t          code = 0;
246
  SRSmaSnapReader* pReader = *ppReader;
C
Cary Xu 已提交
247

K
kailixu 已提交
248 249 250 251 252 253
  tdRSmaFSUnRef(pReader->pSma, &pReader->fs);
  if (pReader->pQTaskFReader) {
    taosCloseFile(&pReader->pQTaskFReader->pReadH);
    taosMemoryFree(pReader->pQTaskFReader);
  }

C
Cary Xu 已提交
254 255 256 257 258 259
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
    if (pReader->pDataReader[i]) {
      tsdbSnapReaderClose(&pReader->pDataReader[i]);
    }
  }

S
Shengliang Guan 已提交
260
  smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
C
Cary Xu 已提交
261 262 263 264 265

  taosMemoryFreeClear(*ppReader);
  return code;
}

266 267
// SRSmaSnapWriter ========================================
struct SRSmaSnapWriter {
C
Cary Xu 已提交
268 269 270
  SSma*   pSma;
  int64_t sver;
  int64_t ever;
K
kailixu 已提交
271
  SRSmaFS fs;
C
Cary Xu 已提交
272 273 274 275 276

  // for data file
  STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
};

277
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) {
C
Cary Xu 已提交
278
  int32_t          code = 0;
K
kailixu 已提交
279
  int32_t          lino = 0;
C
Cary Xu 已提交
280
  SVnode*          pVnode = pSma->pVnode;
K
kailixu 已提交
281
  SRSmaSnapWriter* pWriter = NULL;
C
Cary Xu 已提交
282 283

  // alloc
284
  pWriter = (SRSmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
K
kailixu 已提交
285
  if (!pWriter) {
C
Cary Xu 已提交
286
    code = TSDB_CODE_OUT_OF_MEMORY;
K
kailixu 已提交
287
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
288 289 290 291 292
  }
  pWriter->pSma = pSma;
  pWriter->sver = sver;
  pWriter->ever = ever;

C
Cary Xu 已提交
293
  // rsma1/rsma2
C
Cary Xu 已提交
294 295 296
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
    if (pSma->pRSmaTsdb[i]) {
      code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
K
kailixu 已提交
297
      TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
298 299 300 301
    }
  }

  // qtaskinfo
K
kailixu 已提交
302 303
  code = tdRSmaFSCopy(pSma, &pWriter->fs);
  TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
304 305

  // snapWriter
C
Cary Xu 已提交
306
  *ppWriter = pWriter;
K
kailixu 已提交
307 308 309 310 311 312 313 314
_exit:
  if (code) {
    if (pWriter) rsmaSnapWriterClose(&pWriter, 0);
    *ppWriter = NULL;
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
  }
C
Cary Xu 已提交
315
  return code;
K
kailixu 已提交
316
}
C
Cary Xu 已提交
317

K
kailixu 已提交
318 319 320 321 322 323 324 325 326 327 328 329 330
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) {
  int32_t code = 0;
  int32_t lino = 0;

  // rsmaSnapWriterClose

  code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code));
  }
C
Cary Xu 已提交
331 332 333
  return code;
}

334
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
C
Cary Xu 已提交
335
  int32_t          code = 0;
K
kailixu 已提交
336 337 338 339
  int32_t          lino = 0;
  SSma*            pSma = NULL;
  SSmaEnv*         pEnv = NULL;
  SRSmaStat*       pStat = NULL;
340
  SRSmaSnapWriter* pWriter = *ppWriter;
C
Cary Xu 已提交
341

K
kailixu 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354
  if (!pWriter) {
    goto _exit;
  }

  pSma = pWriter->pSma;
  pEnv = SMA_RSMA_ENV(pSma);
  pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);

  // rsma1/rsma2
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
    if (pWriter->pDataWriter[i]) {
      code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
      TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
355
    }
K
kailixu 已提交
356
  }
357

K
kailixu 已提交
358 359 360 361 362 363 364 365 366 367 368
  // qtaskinfo
  if (rollback) {
    tdRSmaFSRollback(pSma);
    // remove qTaskFiles
  } else {
    // lock
    taosWLockLatch(RSMA_FS_LOCK(pStat));
    code = tdRSmaFSCommit(pSma);
    if (code) {
      taosWUnLockLatch(RSMA_FS_LOCK(pStat));
      goto _exit;
C
Cary Xu 已提交
369
    }
K
kailixu 已提交
370 371
    // unlock
    taosWUnLockLatch(RSMA_FS_LOCK(pStat));
C
Cary Xu 已提交
372 373
  }

K
kailixu 已提交
374 375 376 377 378 379 380
  // rsma restore
  code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback);
  TSDB_CHECK_CODE(code, lino, _exit);
  smaInfo("vgId:%d, vnode snapshot rsma writer restore from sync succeed", SMA_VID(pSma));

_exit:
  if (pWriter) taosMemoryFree(pWriter);
C
Cary Xu 已提交
381
  *ppWriter = NULL;
K
kailixu 已提交
382 383 384 385 386
  if (code) {
    smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pSma), tstrerror(code));
  } else {
    smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pSma));
  }
C
Cary Xu 已提交
387 388 389 390

  return code;
}

391
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
C
Cary Xu 已提交
392
  int32_t       code = 0;
K
kailixu 已提交
393
  int32_t       lino = 0;
C
Cary Xu 已提交
394 395 396 397 398 399 400 401 402 403 404
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;

  // rsma1/rsma2
  if (pHdr->type == SNAP_DATA_RSMA1) {
    pHdr->type = SNAP_DATA_TSDB;
    code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData);
  } else if (pHdr->type == SNAP_DATA_RSMA2) {
    pHdr->type = SNAP_DATA_TSDB;
    code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
  } else if (pHdr->type == SNAP_DATA_QTASK) {
    code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
C
Cary Xu 已提交
405
  } else {
K
kailixu 已提交
406
    code = TSDB_CODE_RSMA_FS_SYNC;
C
Cary Xu 已提交
407
  }
K
kailixu 已提交
408
  TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
409 410

_exit:
K
kailixu 已提交
411 412 413 414 415 416
  if (code) {
    smaError("vgId:%d, %s failed at line %d since %s, data type %" PRIi8, SMA_VID(pWriter->pSma), __func__, lino,
             tstrerror(code), pHdr->type);
  } else {
    smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
  }
C
Cary Xu 已提交
417 418 419
  return code;
}

420
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
K
kailixu 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
  int32_t       code = 0;
  int32_t       lino = 0;
  SSma*         pSma = pWriter->pSma;
  SVnode*       pVnode = pSma->pVnode;
  char          fname[TSDB_FILENAME_LEN];
  TdFilePtr     fp = NULL;
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;

  fname[0] = '\0';

  if (pHdr->size != (nData - sizeof(SSnapDataHdr))) {
    code = TSDB_CODE_RSMA_FS_SYNC;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  SQTaskFile qTaskFile = {
      .nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size};

  tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version,
                             tfsGetPrimaryPath(pVnode->pTfs), fname);

  fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (!fp) {
    code = TAOS_SYSTEM_ERROR(errno);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  int64_t contLen = taosWriteFile(fp, pHdr->data, pHdr->size);
  if (contLen != pHdr->size) {
    code = TAOS_SYSTEM_ERROR(errno);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  uint32_t mtime = 0;
  if (taosFStatFile(fp, NULL, &mtime) == 0) {
    qTaskFile.mtime = mtime;
C
Cary Xu 已提交
457
  }
C
Cary Xu 已提交
458

K
kailixu 已提交
459 460 461 462 463 464 465 466 467 468
  if (taosFsyncFile(fp) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  taosCloseFile(&fp);

  code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1);
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
469
_exit:
K
kailixu 已提交
470 471 472 473 474 475 476 477
  if (code) {
    if (fp) {
      (void)taosRemoveFile(fname);
    }
    smaError("vgId:%d, %s failed at line %d since %s, file:%s", TD_VID(pVnode), __func__, lino, tstrerror(code), fname);
  } else {
    smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", TD_VID(pVnode), fname);
  }
C
Cary Xu 已提交
478 479 480

  return code;
}