tsdbReaderWriter.c 41.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
H
Hongze Cheng 已提交
17 18 19

#define TSDB_DEFAULT_PAGE_SIZE 4096

H
Hongze Cheng 已提交
20
// =============== PAGE-WISE FILE ===============
H
Hongze Cheng 已提交
21 22 23
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t opt, STsdbFD **ppFD) {
  int32_t  code = 0;
  STsdbFD *pFD;
H
refact  
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25 26 27 28 29 30 31
  *ppFD = NULL;

  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD));
  if (pFD == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
32

H
Hongze Cheng 已提交
33 34
  pFD->pFD = taosOpenFile(path, opt);
  if (pFD->pFD == NULL) {
H
Hongze Cheng 已提交
35
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
36
    goto _exit;
H
Hongze Cheng 已提交
37 38
  }

H
Hongze Cheng 已提交
39
  pFD->szPage = szPage;
H
Hongze Cheng 已提交
40 41 42 43
  pFD->pgno = 0;
  pFD->nBuf = 0;
  pFD->pBuf = taosMemoryMalloc(pFD->szPage);
  if (pFD->pBuf == NULL) {
H
Hongze Cheng 已提交
44
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
45
    goto _exit;
H
Hongze Cheng 已提交
46
  }
H
Hongze Cheng 已提交
47
  *ppFD = pFD;
H
Hongze Cheng 已提交
48 49 50

_exit:
  return code;
H
Hongze Cheng 已提交
51
}
H
Hongze Cheng 已提交
52

H
Hongze Cheng 已提交
53 54
static void tsdbCloseFile(STsdbFD **ppFD) {
  STsdbFD *pFD = *ppFD;
H
Hongze Cheng 已提交
55 56
  taosMemoryFree(pFD->pBuf);
  taosCloseFile(&pFD->pFD);
H
Hongze Cheng 已提交
57 58
  taosMemoryFree(pFD);
  *ppFD = NULL;
H
Hongze Cheng 已提交
59 60
}

H
Hongze Cheng 已提交
61
static int32_t tsdbFsyncFile(STsdbFD *pFD) {
H
Hongze Cheng 已提交
62
  int32_t code = 0;
H
Hongze Cheng 已提交
63

H
Hongze Cheng 已提交
64 65 66
  if (taosFsyncFile(pFD->pFD) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _exit;
H
Hongze Cheng 已提交
67 68
  }

H
Hongze Cheng 已提交
69
_exit:
H
Hongze Cheng 已提交
70 71 72
  return code;
}

H
Hongze Cheng 已提交
73
static int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) {
H
Hongze Cheng 已提交
74
  int32_t code = 0;
H
Hongze Cheng 已提交
75

H
Hongze Cheng 已提交
76 77 78 79
  int32_t n = 0;
  while (n < nBuf) {
    int32_t remain = pFD->szPage - pFD->nBuf - sizeof(TSCKSUM);
    int32_t size = TMIN(remain, nBuf - n);
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81 82 83
    memcpy(pFD->pBuf + pFD->nBuf, pBuf + n, size);
    n += size;
    pFD->nBuf += size;
H
Hongze Cheng 已提交
84

H
Hongze Cheng 已提交
85 86
    if (pFD->nBuf + sizeof(TSCKSUM) == pFD->szPage) {
      taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
H
Hongze Cheng 已提交
87

H
Hongze Cheng 已提交
88 89 90 91 92
      int64_t n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
      if (n < 0) {
        code = TAOS_SYSTEM_ERROR(errno);
        goto _exit;
      }
H
Hongze Cheng 已提交
93

H
Hongze Cheng 已提交
94
      pFD->nBuf = 0;
H
Hongze Cheng 已提交
95
    }
H
Hongze Cheng 已提交
96
  }
H
Hongze Cheng 已提交
97 98

_exit:
H
Hongze Cheng 已提交
99 100 101
  return code;
}

H
Hongze Cheng 已提交
102
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
H
Hongze Cheng 已提交
103
  int32_t code = 0;
H
more  
Hongze Cheng 已提交
104

H
Hongze Cheng 已提交
105 106
  int64_t n = taosLSeekFile(pFD->pFD, pgno * pFD->szPage, SEEK_SET);
  if (n < 0) {
H
Hongze Cheng 已提交
107
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
108
    goto _exit;
H
Hongze Cheng 已提交
109 110
  }

H
Hongze Cheng 已提交
111
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
H
Hongze Cheng 已提交
112
  if (n < 0) {
H
Hongze Cheng 已提交
113
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
114 115
    goto _exit;
  } else if (n < pFD->szPage) {
H
Hongze Cheng 已提交
116
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
117
    goto _exit;
H
Hongze Cheng 已提交
118 119
  }

H
Hongze Cheng 已提交
120
  if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
H
Hongze Cheng 已提交
121
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
122
    goto _exit;
H
Hongze Cheng 已提交
123 124
  }

H
Hongze Cheng 已提交
125
  pFD->pgno = pgno;
H
Hongze Cheng 已提交
126

H
Hongze Cheng 已提交
127 128 129
_exit:
  return code;
}
H
Hongze Cheng 已提交
130

H
Hongze Cheng 已提交
131
static int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) {
H
Hongze Cheng 已提交
132
  int32_t code = 0;
H
Hongze Cheng 已提交
133

H
Hongze Cheng 已提交
134 135 136 137 138 139 140
  int64_t pgno = offset / pFD->szPage;
  int64_t n = 0;
  if (pFD->pgno == pgno) {
    int64_t bOff = offset % pFD->szPage;
    int64_t nRead = TMIN(pFD->szPage - bOff - sizeof(TSCKSUM), count);
    memcpy(pBuf + n, pFD->pBuf + bOff, nRead);
    n = nRead;
H
Hongze Cheng 已提交
141 142
  }

H
Hongze Cheng 已提交
143 144 145
  while (n < count) {
    code = tsdbReadFilePage(pFD, pgno);
    if (code) goto _exit;
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
    pgno++;
H
Hongze Cheng 已提交
148

H
Hongze Cheng 已提交
149 150 151 152 153 154
    int64_t nRead = TMIN(pFD->szPage - sizeof(TSCKSUM), count - n);
    memcpy(pBuf + n, pFD->pBuf, nRead);
    n += nRead;
  }

_exit:
H
Hongze Cheng 已提交
155
  return code;
H
Hongze Cheng 已提交
156 157
}

H
Hongze Cheng 已提交
158 159 160 161 162 163
static int32_t tsdbLSeekFile(STsdbFD *pFD, int64_t offset) {
  int32_t code = 0;
  ASSERT(0);
  return code;
}

H
Hongze Cheng 已提交
164 165
// SDataFWriter ====================================================
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
H
Hongze Cheng 已提交
166
  int32_t       code = 0;
H
Hongze Cheng 已提交
167 168
  int32_t       flag;
  int64_t       n;
H
Hongze Cheng 已提交
169
  int32_t       szPage = TSDB_DEFAULT_PAGE_SIZE;
H
Hongze Cheng 已提交
170
  SDataFWriter *pWriter = NULL;
H
Hongze Cheng 已提交
171
  char          fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
172
  char          hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
173 174

  // alloc
H
Hongze Cheng 已提交
175 176
  pWriter = taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
H
Hongze Cheng 已提交
177 178 179
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
180
  pWriter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
181 182 183 184 185 186
  pWriter->wSet = (SDFileSet){.diskId = pSet->diskId,
                              .fid = pSet->fid,
                              .pHeadF = &pWriter->fHead,
                              .pDataF = &pWriter->fData,
                              .pSmaF = &pWriter->fSma,
                              .nSstF = pSet->nSstF};
H
Hongze Cheng 已提交
187 188 189 190 191 192 193
  pWriter->fHead = *pSet->pHeadF;
  pWriter->fData = *pSet->pDataF;
  pWriter->fSma = *pSet->pSmaF;
  for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) {
    pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst];
    pWriter->fSst[iSst] = *pSet->aSstF[iSst];
  }
H
Hongze Cheng 已提交
194 195

  // head
H
Hongze Cheng 已提交
196
  flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
197
  tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
H
Hongze Cheng 已提交
198 199
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
200

H
Hongze Cheng 已提交
201 202
  code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
203

H
Hongze Cheng 已提交
204 205 206 207
  ASSERT(n == TSDB_FHDR_SIZE);

  pWriter->fHead.size += TSDB_FHDR_SIZE;

H
Hongze Cheng 已提交
208
  // data
H
Hongze Cheng 已提交
209
  if (pWriter->fData.size == 0) {
H
Hongze Cheng 已提交
210
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
211
  } else {
H
Hongze Cheng 已提交
212
    flag = TD_FILE_READ | TD_FILE_WRITE;
H
Hongze Cheng 已提交
213 214
  }
  tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
H
Hongze Cheng 已提交
215 216
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
217
  if (pWriter->fData.size == 0) {
H
Hongze Cheng 已提交
218 219
    code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
220 221
    pWriter->fData.size += TSDB_FHDR_SIZE;
  } else {
H
Hongze Cheng 已提交
222 223
    // code = tsdbLSeekFile(pWriter->pDataFD, 0, SEEK_END);
    // if (code) goto _err;
H
Hongze Cheng 已提交
224
  }
H
Hongze Cheng 已提交
225 226

  // sma
H
Hongze Cheng 已提交
227
  if (pWriter->fSma.size == 0) {
H
Hongze Cheng 已提交
228
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
229
  } else {
H
Hongze Cheng 已提交
230
    flag = TD_FILE_READ | TD_FILE_WRITE;
H
Hongze Cheng 已提交
231 232
  }
  tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
H
Hongze Cheng 已提交
233 234
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
235
  if (pWriter->fSma.size == 0) {
H
Hongze Cheng 已提交
236 237
    code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
238

H
Hongze Cheng 已提交
239 240
    pWriter->fSma.size += TSDB_FHDR_SIZE;
  } else {
H
Hongze Cheng 已提交
241 242
    code = tsdbLSeekFile(pWriter->pSmaFD, 0);
    if (code) goto _err;
H
Hongze Cheng 已提交
243 244
  }

H
Hongze Cheng 已提交
245 246
  // sst
  ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0);
H
Hongze Cheng 已提交
247
  flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
248
  tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname);
H
Hongze Cheng 已提交
249 250 251 252
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pLastFD);
  if (code) goto _err;
  code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
253 254 255
  pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE;

  *ppWriter = pWriter;
H
Hongze Cheng 已提交
256 257 258
  return code;

_err:
H
Hongze Cheng 已提交
259 260
  tsdbError("vgId:%d, tsdb data file writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppWriter = NULL;
H
Hongze Cheng 已提交
261 262 263
  return code;
}

H
Hongze Cheng 已提交
264
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
H
Hongze Cheng 已提交
265
  int32_t code = 0;
H
Hongze Cheng 已提交
266
  STsdb  *pTsdb = NULL;
H
Hongze Cheng 已提交
267

H
Hongze Cheng 已提交
268 269 270 271
  if (*ppWriter == NULL) goto _exit;

  pTsdb = (*ppWriter)->pTsdb;
  if (sync) {
H
Hongze Cheng 已提交
272
    if (tsdbFsyncFile((*ppWriter)->pHeadFD) < 0) {
H
Hongze Cheng 已提交
273 274 275 276
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }

H
Hongze Cheng 已提交
277
    if (tsdbFsyncFile((*ppWriter)->pDataFD) < 0) {
H
Hongze Cheng 已提交
278 279 280 281
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }

H
Hongze Cheng 已提交
282
    if (tsdbFsyncFile((*ppWriter)->pSmaFD) < 0) {
H
Hongze Cheng 已提交
283 284 285 286
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }

H
Hongze Cheng 已提交
287
    if (tsdbFsyncFile((*ppWriter)->pLastFD) < 0) {
H
Hongze Cheng 已提交
288 289 290 291 292
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
  }

H
Hongze Cheng 已提交
293 294 295 296
  tsdbCloseFile(&(*ppWriter)->pHeadFD);
  tsdbCloseFile(&(*ppWriter)->pDataFD);
  tsdbCloseFile(&(*ppWriter)->pSmaFD);
  tsdbCloseFile(&(*ppWriter)->pLastFD);
H
Hongze Cheng 已提交
297

H
Hongze Cheng 已提交
298 299
  for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree((*ppWriter)->aBuf[iBuf]);
H
Hongze Cheng 已提交
300
  }
H
Hongze Cheng 已提交
301
  taosMemoryFree(*ppWriter);
H
Hongze Cheng 已提交
302
_exit:
H
Hongze Cheng 已提交
303
  *ppWriter = NULL;
H
Hongze Cheng 已提交
304 305 306
  return code;

_err:
H
Hongze Cheng 已提交
307
  tsdbError("vgId:%d, data file writer close failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
308 309 310
  return code;
}

H
Hongze Cheng 已提交
311 312 313 314
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
  int32_t code = 0;
  int64_t n;
  char    hdr[TSDB_FHDR_SIZE];
H
Hongze Cheng 已提交
315

H
Hongze Cheng 已提交
316 317 318
  // head ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutHeadFile(hdr, &pWriter->fHead);
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320 321
  code = tsdbLSeekFile(pWriter->pHeadFD, 0);
  if (code) goto _err;
H
Hongze Cheng 已提交
322

H
Hongze Cheng 已提交
323 324
  code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
325 326 327 328 329

  // data ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutDataFile(hdr, &pWriter->fData);

H
Hongze Cheng 已提交
330 331
  code = tsdbLSeekFile(pWriter->pDataFD, 0);
  if (code) goto _err;
H
Hongze Cheng 已提交
332

H
Hongze Cheng 已提交
333 334
  code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
335

H
Hongze Cheng 已提交
336 337 338
  // sma ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutSmaFile(hdr, &pWriter->fSma);
H
Hongze Cheng 已提交
339

H
Hongze Cheng 已提交
340 341
  code = tsdbLSeekFile(pWriter->pSmaFD, 0);
  if (code) goto _err;
H
Hongze Cheng 已提交
342

H
Hongze Cheng 已提交
343 344
  code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
345

H
Hongze Cheng 已提交
346 347 348 349
  // sst ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]);

H
Hongze Cheng 已提交
350 351
  code = tsdbLSeekFile(pWriter->pLastFD, 0);
  if (code) goto _err;
H
Hongze Cheng 已提交
352

H
Hongze Cheng 已提交
353 354
  code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
355 356 357 358

  return code;

_err:
H
Hongze Cheng 已提交
359
  tsdbError("vgId:%d, update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
360 361 362
  return code;
}

H
Hongze Cheng 已提交
363 364 365 366 367
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
  int32_t    code = 0;
  SHeadFile *pHeadFile = &pWriter->fHead;
  int64_t    size = 0;
  int64_t    n;
H
Hongze Cheng 已提交
368

H
Hongze Cheng 已提交
369 370 371
  // check
  if (taosArrayGetSize(aBlockIdx) == 0) {
    pHeadFile->offset = pHeadFile->size;
H
Hongze Cheng 已提交
372 373
    goto _exit;
  }
H
Hongze Cheng 已提交
374

H
Hongze Cheng 已提交
375 376 377 378 379
  // prepare
  for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
    size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx));
  }

H
Hongze Cheng 已提交
380
  // alloc
H
Hongze Cheng 已提交
381
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
382 383
  if (code) goto _err;

H
Hongze Cheng 已提交
384 385 386 387
  // build
  n = 0;
  for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
    n += tPutBlockIdx(pWriter->aBuf[0] + n, taosArrayGet(aBlockIdx, iBlockIdx));
H
Hongze Cheng 已提交
388
  }
H
Hongze Cheng 已提交
389
  ASSERT(n == size);
H
Hongze Cheng 已提交
390 391

  // write
H
Hongze Cheng 已提交
392 393
  code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
394

H
Hongze Cheng 已提交
395 396 397
  // update
  pHeadFile->offset = pHeadFile->size;
  pHeadFile->size += size;
H
Hongze Cheng 已提交
398

H
Hongze Cheng 已提交
399
_exit:
H
Hongze Cheng 已提交
400 401
  tsdbTrace("vgId:%d write block idx, offset:%" PRId64 " size:%" PRId64 " nBlockIdx:%d", TD_VID(pWriter->pTsdb->pVnode),
            pHeadFile->offset, size, taosArrayGetSize(aBlockIdx));
H
Hongze Cheng 已提交
402 403 404
  return code;

_err:
H
Hongze Cheng 已提交
405
  tsdbError("vgId:%d, write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
406 407 408
  return code;
}

H
Hongze Cheng 已提交
409 410 411 412 413 414 415
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) {
  int32_t    code = 0;
  SHeadFile *pHeadFile = &pWriter->fHead;
  int64_t    size;
  int64_t    n;

  ASSERT(mBlock->nItem > 0);
H
Hongze Cheng 已提交
416

H
Hongze Cheng 已提交
417
  // alloc
H
Hongze Cheng 已提交
418
  size = tPutMapData(NULL, mBlock);
H
Hongze Cheng 已提交
419
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
420 421
  if (code) goto _err;

H
Hongze Cheng 已提交
422
  // build
H
Hongze Cheng 已提交
423
  n = tPutMapData(pWriter->aBuf[0] + n, mBlock);
H
Hongze Cheng 已提交
424 425

  // write
H
Hongze Cheng 已提交
426 427
  code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
428

H
Hongze Cheng 已提交
429 430 431 432
  // update
  pBlockIdx->offset = pHeadFile->size;
  pBlockIdx->size = size;
  pHeadFile->size += size;
H
Hongze Cheng 已提交
433

H
Hongze Cheng 已提交
434 435 436 437
  tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64
            " size:%" PRId64 " nItem:%d",
            TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid,
            pBlockIdx->offset, pBlockIdx->size, mBlock->nItem);
H
Hongze Cheng 已提交
438 439 440
  return code;

_err:
H
Hongze Cheng 已提交
441
  tsdbError("vgId:%d, write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
442 443 444
  return code;
}

H
Hongze Cheng 已提交
445
int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
H
Hongze Cheng 已提交
446
  int32_t   code = 0;
H
Hongze Cheng 已提交
447
  SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1];
H
Hongze Cheng 已提交
448
  int64_t   size = 0;
H
Hongze Cheng 已提交
449
  int64_t   n;
H
Hongze Cheng 已提交
450

H
Hongze Cheng 已提交
451 452 453 454 455
  // check
  if (taosArrayGetSize(aSstBlk) == 0) {
    pSstFile->offset = pSstFile->size;
    goto _exit;
  }
H
Hongze Cheng 已提交
456

H
Hongze Cheng 已提交
457 458 459 460
  // size
  for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
    size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL));
  }
H
Hongze Cheng 已提交
461 462

  // alloc
H
Hongze Cheng 已提交
463
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
464 465
  if (code) goto _err;

H
Hongze Cheng 已提交
466 467 468 469
  // encode
  n = 0;
  for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
    n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL));
H
Hongze Cheng 已提交
470
  }
H
Hongze Cheng 已提交
471 472

  // write
H
Hongze Cheng 已提交
473 474
  code = tsdbWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476 477 478
  // update
  pSstFile->offset = pSstFile->size;
  pSstFile->size += size;
H
Hongze Cheng 已提交
479

H
Hongze Cheng 已提交
480 481 482
_exit:
  tsdbTrace("vgId:%d tsdb write blockl, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
            pSstFile->offset, size);
H
Hongze Cheng 已提交
483 484 485
  return code;

_err:
H
Hongze Cheng 已提交
486
  tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
487 488 489
  return code;
}

H
Hongze Cheng 已提交
490
static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) {
H
Hongze Cheng 已提交
491 492
  int32_t code = 0;

H
Hongze Cheng 已提交
493 494
  pSmaInfo->offset = 0;
  pSmaInfo->size = 0;
H
Hongze Cheng 已提交
495

H
Hongze Cheng 已提交
496 497 498
  // encode
  for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
H
Hongze Cheng 已提交
499

H
Hongze Cheng 已提交
500
    if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue;
H
Hongze Cheng 已提交
501

H
Hongze Cheng 已提交
502 503
    SColumnDataAgg sma;
    tsdbCalcColDataSMA(pColData, &sma);
H
Hongze Cheng 已提交
504

H
Hongze Cheng 已提交
505 506 507 508
    code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &sma));
    if (code) goto _err;
    pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &sma);
  }
H
Hongze Cheng 已提交
509

H
Hongze Cheng 已提交
510 511
  // write
  if (pSmaInfo->size) {
H
Hongze Cheng 已提交
512
    code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
513
    if (code) goto _err;
H
Hongze Cheng 已提交
514

H
Hongze Cheng 已提交
515 516
    code = tsdbWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], pSmaInfo->size, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
517

H
Hongze Cheng 已提交
518
    pSmaInfo->offset = pWriter->fSma.size;
H
Hongze Cheng 已提交
519
    // pWriter->fSma.size += size;
H
Hongze Cheng 已提交
520 521 522 523 524
  }

  return code;

_err:
H
Hongze Cheng 已提交
525
  tsdbError("vgId:%d tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
526 527 528
  return code;
}

H
Hongze Cheng 已提交
529 530
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
                           int8_t cmprAlg, int8_t toLast) {
H
Hongze Cheng 已提交
531 532
  int32_t code = 0;

H
Hongze Cheng 已提交
533
  ASSERT(pBlockData->nRow > 0);
H
Hongze Cheng 已提交
534

H
Hongze Cheng 已提交
535 536 537
  pBlkInfo->offset = toLast ? pWriter->fSst[pWriter->wSet.nSstF - 1].size : pWriter->fData.size;
  pBlkInfo->szBlock = 0;
  pBlkInfo->szKey = 0;
H
Hongze Cheng 已提交
538

H
Hongze Cheng 已提交
539 540 541
  int32_t aBufN[4] = {0};
  code = tCmprBlockData(pBlockData, cmprAlg, NULL, NULL, pWriter->aBuf, aBufN);
  if (code) goto _err;
H
Hongze Cheng 已提交
542

H
Hongze Cheng 已提交
543
  // write =================
H
Hongze Cheng 已提交
544
  STsdbFD *pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
H
Hongze Cheng 已提交
545

H
Hongze Cheng 已提交
546 547
  pBlkInfo->szKey = aBufN[3] + aBufN[2];
  pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
H
Hongze Cheng 已提交
548

H
Hongze Cheng 已提交
549 550
  code = tsdbWriteFile(pFD, pWriter->aBuf[3], aBufN[3], NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
551

H
Hongze Cheng 已提交
552 553
  code = tsdbWriteFile(pFD, pWriter->aBuf[2], aBufN[2], NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
554

H
Hongze Cheng 已提交
555
  if (aBufN[1]) {
H
Hongze Cheng 已提交
556 557
    code = tsdbWriteFile(pFD, pWriter->aBuf[1], aBufN[1], NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
558
  }
H
Hongze Cheng 已提交
559

H
Hongze Cheng 已提交
560
  if (aBufN[0]) {
H
Hongze Cheng 已提交
561 562
    code = tsdbWriteFile(pFD, pWriter->aBuf[0], aBufN[0], NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
563
  }
H
Hongze Cheng 已提交
564

H
Hongze Cheng 已提交
565 566 567 568 569 570
  // update info
  if (toLast) {
    pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock;
  } else {
    pWriter->fData.size += pBlkInfo->szBlock;
  }
H
Hongze Cheng 已提交
571

H
Hongze Cheng 已提交
572 573 574 575 576
  // ================= SMA ====================
  if (pSmaInfo) {
    code = tsdbWriteBlockSma(pWriter, pBlockData, pSmaInfo);
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
577

H
Hongze Cheng 已提交
578 579 580 581
_exit:
  tsdbTrace("vgId:%d tsdb write block data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d, offset:%" PRId64 " size:%d",
            TD_VID(pWriter->pTsdb->pVnode), pBlockData->suid, pBlockData->uid, pBlockData->nRow, pBlkInfo->offset,
            pBlkInfo->szBlock);
H
Hongze Cheng 已提交
582 583 584
  return code;

_err:
H
Hongze Cheng 已提交
585
  tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
586 587
  return code;
}
H
Hongze Cheng 已提交
588

H
Hongze Cheng 已提交
589 590 591 592 593 594 595 596
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
  int32_t   code = 0;
  int64_t   n;
  int64_t   size;
  TdFilePtr pOutFD = NULL;  // TODO
  TdFilePtr PInFD = NULL;   // TODO
  char      fNameFrom[TSDB_FILENAME_LEN];
  char      fNameTo[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
597

H
Hongze Cheng 已提交
598 599 600
  // head
  tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
  tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
H
Hongze Cheng 已提交
601

H
Hongze Cheng 已提交
602 603 604
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
605 606 607
    goto _err;
  }

H
Hongze Cheng 已提交
608 609
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
610 611 612 613
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
614
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pHeadF->size);
H
Hongze Cheng 已提交
615 616 617 618
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
619 620
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
621 622

  // data
H
Hongze Cheng 已提交
623 624
  tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom);
  tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo);
H
Hongze Cheng 已提交
625

H
Hongze Cheng 已提交
626 627
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
H
Hongze Cheng 已提交
628 629 630 631
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
632 633
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
634 635 636
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
637 638

  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pDataF->size);
H
Hongze Cheng 已提交
639 640 641 642
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
643 644
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
645

H
Hongze Cheng 已提交
646 647 648
  // sst
  tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom);
  tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo);
H
Hongze Cheng 已提交
649

H
Hongze Cheng 已提交
650 651 652 653
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
654 655
  }

H
Hongze Cheng 已提交
656 657
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
658 659 660 661
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
662 663
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size);
  if (n < 0) {
H
Hongze Cheng 已提交
664 665 666
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
667 668
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
669

H
Hongze Cheng 已提交
670 671 672 673 674 675
  // sma
  tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom);
  tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo);

  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
H
Hongze Cheng 已提交
676 677 678 679
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
680 681
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
682 683 684 685
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
686 687 688 689
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pSmaF->size);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
690
  }
H
Hongze Cheng 已提交
691 692 693
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);

H
Hongze Cheng 已提交
694 695 696
  return code;

_err:
H
Hongze Cheng 已提交
697
  tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
698 699 700
  return code;
}

H
Hongze Cheng 已提交
701 702 703 704
// SDataFReader ====================================================
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
  int32_t       code = 0;
  SDataFReader *pReader;
H
Hongze Cheng 已提交
705
  int32_t       szPage = TSDB_DEFAULT_PAGE_SIZE;
H
Hongze Cheng 已提交
706
  char          fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
707

H
Hongze Cheng 已提交
708 709 710 711
  // alloc
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
712 713
    goto _err;
  }
H
Hongze Cheng 已提交
714 715
  pReader->pTsdb = pTsdb;
  pReader->pSet = pSet;
H
Hongze Cheng 已提交
716

H
Hongze Cheng 已提交
717 718
  // head
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
H
Hongze Cheng 已提交
719 720
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
721

H
Hongze Cheng 已提交
722 723
  // data
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
H
Hongze Cheng 已提交
724 725
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
726

H
Hongze Cheng 已提交
727 728
  // sma
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
H
Hongze Cheng 已提交
729 730
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
731

H
Hongze Cheng 已提交
732 733 734
  // sst
  for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
    tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
H
Hongze Cheng 已提交
735
    code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSstFD[iSst]);
H
Hongze Cheng 已提交
736
    if (code) goto _err;
H
Hongze Cheng 已提交
737
  }
H
Hongze Cheng 已提交
738

H
Hongze Cheng 已提交
739 740 741 742 743 744 745 746 747 748 749 750 751 752
  *ppReader = pReader;
  return code;

_err:
  tsdbError("vgId:%d, tsdb data file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
}

int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
  int32_t code = 0;
  if (*ppReader == NULL) goto _exit;

  // head
H
Hongze Cheng 已提交
753
  tsdbCloseFile(&(*ppReader)->pHeadFD);
H
Hongze Cheng 已提交
754

H
Hongze Cheng 已提交
755
  // data
H
Hongze Cheng 已提交
756
  tsdbCloseFile(&(*ppReader)->pDataFD);
H
Hongze Cheng 已提交
757

H
Hongze Cheng 已提交
758
  // sma
H
Hongze Cheng 已提交
759
  tsdbCloseFile(&(*ppReader)->pSmaFD);
H
Hongze Cheng 已提交
760

H
Hongze Cheng 已提交
761 762
  // sst
  for (int32_t iSst = 0; iSst < (*ppReader)->pSet->nSstF; iSst++) {
H
Hongze Cheng 已提交
763
    tsdbCloseFile(&(*ppReader)->aSstFD[iSst]);
H
Hongze Cheng 已提交
764 765 766 767
  }

  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree((*ppReader)->aBuf[iBuf]);
H
Hongze Cheng 已提交
768
  }
H
Hongze Cheng 已提交
769
  taosMemoryFree(*ppReader);
H
Hongze Cheng 已提交
770

H
Hongze Cheng 已提交
771 772
_exit:
  *ppReader = NULL;
H
Hongze Cheng 已提交
773 774 775
  return code;

_err:
H
Hongze Cheng 已提交
776
  tsdbError("vgId:%d, data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
777 778 779
  return code;
}

H
Hongze Cheng 已提交
780 781 782 783 784 785
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
  int32_t  code = 0;
  int64_t  offset = pReader->pSet->pHeadF->offset;
  int64_t  size = pReader->pSet->pHeadF->size - offset;
  int64_t  n;
  uint32_t delimiter;
H
Hongze Cheng 已提交
786

H
Hongze Cheng 已提交
787 788
  taosArrayClear(aBlockIdx);
  if (size == 0) {
H
Hongze Cheng 已提交
789 790
    goto _exit;
  }
H
Hongze Cheng 已提交
791 792

  // alloc
H
Hongze Cheng 已提交
793
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
794 795
  if (code) goto _err;

H
Hongze Cheng 已提交
796 797 798 799 800
  // // seek
  // if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
  //   code = TAOS_SYSTEM_ERROR(errno);
  //   goto _err;
  // }
H
Hongze Cheng 已提交
801

H
Hongze Cheng 已提交
802
  // read
H
Hongze Cheng 已提交
803
  n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
H
Hongze Cheng 已提交
804 805 806
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830
  } else if (n < size) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
  }

  // check
  if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
  }

  // decode
  n = 0;
  n = tGetU32(pReader->aBuf[0] + n, &delimiter);
  ASSERT(delimiter == TSDB_FILE_DLMT);

  while (n < size - sizeof(TSCKSUM)) {
    SBlockIdx blockIdx;
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);

    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
831 832
  }

H
Hongze Cheng 已提交
833
  ASSERT(n + sizeof(TSCKSUM) == size);
H
Hongze Cheng 已提交
834

H
Hongze Cheng 已提交
835
_exit:
H
Hongze Cheng 已提交
836 837 838
  return code;

_err:
H
Hongze Cheng 已提交
839
  tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
840 841 842
  return code;
}

H
Hongze Cheng 已提交
843 844 845 846 847 848
int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
  int32_t  code = 0;
  int64_t  offset = pReader->pSet->aSstF[iSst]->offset;
  int64_t  size = pReader->pSet->aSstF[iSst]->size - offset;
  int64_t  n;
  uint32_t delimiter;
H
Hongze Cheng 已提交
849

H
Hongze Cheng 已提交
850 851 852 853
  taosArrayClear(aSstBlk);
  if (size == 0) {
    goto _exit;
  }
H
Hongze Cheng 已提交
854 855

  // alloc
H
Hongze Cheng 已提交
856
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
857 858
  if (code) goto _err;

H
Hongze Cheng 已提交
859
  // // seek
H
Hongze Cheng 已提交
860
  // if (taosLSeekFile(pReader->aSstFD[iSst], offset, SEEK_SET) < 0) {
H
Hongze Cheng 已提交
861 862 863
  //   code = TAOS_SYSTEM_ERROR(errno);
  //   goto _err;
  // }
H
Hongze Cheng 已提交
864

H
Hongze Cheng 已提交
865
  // read
H
Hongze Cheng 已提交
866
  n = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size);
H
Hongze Cheng 已提交
867 868 869
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
870 871 872
  } else if (n < size) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
H
Hongze Cheng 已提交
873 874
  }

H
Hongze Cheng 已提交
875 876 877 878 879
  // check
  if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
  }
H
Hongze Cheng 已提交
880

H
Hongze Cheng 已提交
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898
  // decode
  n = 0;
  n = tGetU32(pReader->aBuf[0] + n, &delimiter);
  ASSERT(delimiter == TSDB_FILE_DLMT);

  while (n < size - sizeof(TSCKSUM)) {
    SSstBlk blockl;
    n += tGetSstBlk(pReader->aBuf[0] + n, &blockl);

    if (taosArrayPush(aSstBlk, &blockl) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
  }

  ASSERT(n + sizeof(TSCKSUM) == size);

_exit:
H
Hongze Cheng 已提交
899 900 901
  return code;

_err:
H
Hongze Cheng 已提交
902
  tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
903 904 905
  return code;
}

H
Hongze Cheng 已提交
906 907 908 909 910 911
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) {
  int32_t code = 0;
  int64_t offset = pBlockIdx->offset;
  int64_t size = pBlockIdx->size;
  int64_t n;
  int64_t tn;
H
Hongze Cheng 已提交
912 913

  // alloc
H
Hongze Cheng 已提交
914
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
915 916
  if (code) goto _err;

H
Hongze Cheng 已提交
917 918 919 920 921
  // // seek
  // if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
  //   code = TAOS_SYSTEM_ERROR(errno);
  //   goto _err;
  // }
H
Hongze Cheng 已提交
922

H
Hongze Cheng 已提交
923
  // read
H
Hongze Cheng 已提交
924
  n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
H
Hongze Cheng 已提交
925 926 927
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
928 929 930
  } else if (n < size) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
H
Hongze Cheng 已提交
931 932
  }

H
Hongze Cheng 已提交
933 934 935 936 937
  // check
  if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
  }
H
Hongze Cheng 已提交
938

H
Hongze Cheng 已提交
939 940
  // decode
  n = 0;
H
Hongze Cheng 已提交
941

H
Hongze Cheng 已提交
942 943 944
  uint32_t delimiter;
  n += tGetU32(pReader->aBuf[0] + n, &delimiter);
  ASSERT(delimiter == TSDB_FILE_DLMT);
H
Hongze Cheng 已提交
945

H
Hongze Cheng 已提交
946 947 948 949 950 951 952
  tn = tGetMapData(pReader->aBuf[0] + n, mBlock);
  if (tn < 0) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  n += tn;
  ASSERT(n + sizeof(TSCKSUM) == size);
H
Hongze Cheng 已提交
953

H
Hongze Cheng 已提交
954
  return code;
H
Hongze Cheng 已提交
955

H
Hongze Cheng 已提交
956 957 958
_err:
  tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
959
}
H
Hongze Cheng 已提交
960

H
Hongze Cheng 已提交
961 962 963
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) {
  int32_t   code = 0;
  SSmaInfo *pSmaInfo = &pDataBlk->smaInfo;
H
Hongze Cheng 已提交
964

H
Hongze Cheng 已提交
965
  ASSERT(pSmaInfo->size > 0);
H
Hongze Cheng 已提交
966

H
Hongze Cheng 已提交
967
  taosArrayClear(aColumnDataAgg);
H
Hongze Cheng 已提交
968

H
Hongze Cheng 已提交
969 970 971 972
  // alloc
  int32_t size = pSmaInfo->size + sizeof(TSCKSUM);
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
973

H
Hongze Cheng 已提交
974 975 976 977 978 979 980 981 982
  // // seek
  // int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET);
  // if (n < 0) {
  //   code = TAOS_SYSTEM_ERROR(errno);
  //   goto _err;
  // } else if (n < pSmaInfo->offset) {
  //   code = TSDB_CODE_FILE_CORRUPTED;
  //   goto _err;
  // }
H
Hongze Cheng 已提交
983

H
Hongze Cheng 已提交
984
  // read
H
Hongze Cheng 已提交
985
  int64_t n = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size);
H
Hongze Cheng 已提交
986 987 988 989 990 991 992
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  } else if (n < size) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
  }
H
Hongze Cheng 已提交
993

H
Hongze Cheng 已提交
994 995 996 997 998
  // check
  if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
  }
H
Hongze Cheng 已提交
999

H
Hongze Cheng 已提交
1000 1001 1002 1003
  // decode
  n = 0;
  while (n < pSmaInfo->size) {
    SColumnDataAgg sma;
H
Hongze Cheng 已提交
1004

H
Hongze Cheng 已提交
1005 1006 1007
    n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma);
    if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1008 1009
      goto _err;
    }
H
Hongze Cheng 已提交
1010
  }
H
Hongze Cheng 已提交
1011

H
Hongze Cheng 已提交
1012
  return code;
H
Hongze Cheng 已提交
1013

H
Hongze Cheng 已提交
1014
_err:
H
Hongze Cheng 已提交
1015
  tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1016 1017
  return code;
}
H
Hongze Cheng 已提交
1018

H
Hongze Cheng 已提交
1019 1020
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, int8_t fromLast,
                                     SBlockData *pBlockData) {
H
Hongze Cheng 已提交
1021
  int32_t code = 0;
H
Hongze Cheng 已提交
1022

H
Hongze Cheng 已提交
1023 1024
  tBlockDataClear(pBlockData);

H
Hongze Cheng 已提交
1025
  STsdbFD *pFD = fromLast ? pReader->aSstFD[0] : pReader->pDataFD;  // (todo)
H
Hongze Cheng 已提交
1026 1027

  // todo: realloc pReader->aBuf[0]
H
Hongze Cheng 已提交
1028 1029

  // uid + version + tskey
H
Hongze Cheng 已提交
1030
  tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey);  // todo
H
Hongze Cheng 已提交
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
  SDiskDataHdr hdr;
  uint8_t     *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr);

  ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
  ASSERT(pBlockData->suid == hdr.suid);
  ASSERT(pBlockData->uid == hdr.uid);

  pBlockData->nRow = hdr.nRow;

  // uid
  if (hdr.uid == 0) {
    ASSERT(hdr.szUid);
    code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid,
                          sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
    if (code) goto _err;
  } else {
    ASSERT(!hdr.szUid);
  }
  p += hdr.szUid;
H
Hongze Cheng 已提交
1050

H
Hongze Cheng 已提交
1051 1052 1053 1054 1055
  // version
  code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion,
                        sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
  if (code) goto _err;
  p += hdr.szVer;
H
Hongze Cheng 已提交
1056

H
Hongze Cheng 已提交
1057 1058 1059
  // TSKEY
  code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
                        sizeof(TSKEY) * hdr.nRow, &pReader->aBuf[1]);
H
Hongze Cheng 已提交
1060
  if (code) goto _err;
H
Hongze Cheng 已提交
1061
  p += hdr.szKey;
H
Hongze Cheng 已提交
1062

H
Hongze Cheng 已提交
1063
  ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey - sizeof(TSCKSUM));
H
Hongze Cheng 已提交
1064

H
Hongze Cheng 已提交
1065 1066
  // read and decode columns
  if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit;
H
Hongze Cheng 已提交
1067

H
Hongze Cheng 已提交
1068 1069
  if (hdr.szBlkCol > 0) {
    int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
H
Hongze Cheng 已提交
1070
    tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol + sizeof(TSCKSUM));
H
Hongze Cheng 已提交
1071 1072
  }

H
Hongze Cheng 已提交
1073 1074 1075
  SBlockCol  blockCol = {.cid = 0};
  SBlockCol *pBlockCol = &blockCol;
  int32_t    n = 0;
H
Hongze Cheng 已提交
1076

H
Hongze Cheng 已提交
1077 1078
  for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
H
Hongze Cheng 已提交
1079

H
Hongze Cheng 已提交
1080 1081 1082 1083 1084 1085 1086
    while (pBlockCol && pBlockCol->cid < pColData->cid) {
      if (n < hdr.szBlkCol) {
        n += tGetBlockCol(pReader->aBuf[0] + n, pBlockCol);
      } else {
        ASSERT(n == hdr.szBlkCol);
        pBlockCol = NULL;
      }
H
Hongze Cheng 已提交
1087
    }
H
Hongze Cheng 已提交
1088

H
Hongze Cheng 已提交
1089 1090 1091 1092 1093 1094 1095 1096 1097
    if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) {
      // add a lot of NONE
      for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
        code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
        if (code) goto _err;
      }
    } else {
      ASSERT(pBlockCol->type == pColData->type);
      ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
H
Hongze Cheng 已提交
1098

H
Hongze Cheng 已提交
1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
      if (pBlockCol->flag == HAS_NULL) {
        // add a lot of NULL
        for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
          code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
          if (code) goto _err;
        }
      } else {
        // decode from binary
        int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + pBlockCol->offset;
        int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);

H
Hongze Cheng 已提交
1110
        tsdbReadFile(pFD, offset, pReader->aBuf[1], size);
H
Hongze Cheng 已提交
1111 1112 1113 1114 1115

        code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
        if (code) goto _err;
      }
    }
H
Hongze Cheng 已提交
1116
  }
H
Hongze Cheng 已提交
1117

H
Hongze Cheng 已提交
1118
_exit:
H
Hongze Cheng 已提交
1119 1120
  return code;

H
Hongze Cheng 已提交
1121
_err:
H
Hongze Cheng 已提交
1122
  tsdbError("vgId:%d tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1123
  return code;
H
Hongze Cheng 已提交
1124
}
H
Hongze Cheng 已提交
1125

H
Hongze Cheng 已提交
1126 1127
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1128

H
Hongze Cheng 已提交
1129 1130
  code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], 0, pBlockData);
  if (code) goto _err;
H
Hongze Cheng 已提交
1131

H
Hongze Cheng 已提交
1132 1133 1134
  if (pDataBlk->nSubBlock > 1) {
    SBlockData bData1;
    SBlockData bData2;
H
Hongze Cheng 已提交
1135

H
Hongze Cheng 已提交
1136 1137 1138 1139 1140
    // create
    code = tBlockDataCreate(&bData1);
    if (code) goto _err;
    code = tBlockDataCreate(&bData2);
    if (code) goto _err;
H
Hongze Cheng 已提交
1141

H
Hongze Cheng 已提交
1142 1143 1144
    // init
    tBlockDataInitEx(&bData1, pBlockData);
    tBlockDataInitEx(&bData2, pBlockData);
H
Hongze Cheng 已提交
1145

H
Hongze Cheng 已提交
1146 1147 1148 1149 1150 1151 1152
    for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
      code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], 0, &bData1);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
H
Hongze Cheng 已提交
1153

H
Hongze Cheng 已提交
1154 1155 1156 1157 1158 1159
      code = tBlockDataCopy(pBlockData, &bData2);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
H
Hongze Cheng 已提交
1160

H
Hongze Cheng 已提交
1161 1162 1163 1164 1165 1166 1167
      code = tBlockDataMerge(&bData1, &bData2, pBlockData);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
    }
H
Hongze Cheng 已提交
1168

H
Hongze Cheng 已提交
1169 1170
    tBlockDataDestroy(&bData1, 1);
    tBlockDataDestroy(&bData2, 1);
H
Hongze Cheng 已提交
1171 1172
  }

H
Hongze Cheng 已提交
1173
  return code;
H
Hongze Cheng 已提交
1174

H
Hongze Cheng 已提交
1175 1176 1177 1178
_err:
  tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1179

H
Hongze Cheng 已提交
1180 1181
int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1182

H
Hongze Cheng 已提交
1183 1184
  code = tsdbReadBlockDataImpl(pReader, &pSstBlk->bInfo, 1, pBlockData);
  if (code) goto _err;
H
Hongze Cheng 已提交
1185

H
Hongze Cheng 已提交
1186
  return code;
H
Hongze Cheng 已提交
1187

H
Hongze Cheng 已提交
1188 1189 1190 1191
_err:
  tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1192

H
Hongze Cheng 已提交
1193 1194
int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1195

H
Hongze Cheng 已提交
1196
  // read
H
Hongze Cheng 已提交
1197
  tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock);
H
Hongze Cheng 已提交
1198

H
Hongze Cheng 已提交
1199 1200 1201
  // decmpr
  code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
  if (code) goto _exit;
H
Hongze Cheng 已提交
1202

H
Hongze Cheng 已提交
1203
_exit:
H
Hongze Cheng 已提交
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355
  return code;
}

// SDelFWriter ====================================================
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
  int32_t      code = 0;
  char         fname[TSDB_FILENAME_LEN];
  char         hdr[TSDB_FHDR_SIZE] = {0};
  SDelFWriter *pDelFWriter;
  int64_t      n;

  // alloc
  pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter));
  if (pDelFWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pDelFWriter->pTsdb = pTsdb;
  pDelFWriter->fDel = *pFile;

  tsdbDelFileName(pTsdb, pFile, fname);
  pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE);
  if (pDelFWriter->pWriteH == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  // update header
  n = taosWriteFile(pDelFWriter->pWriteH, &hdr, TSDB_FHDR_SIZE);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
  pDelFWriter->fDel.offset = 0;

  *ppWriter = pDelFWriter;
  return code;

_err:
  tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppWriter = NULL;
  return code;
}

int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
  int32_t      code = 0;
  SDelFWriter *pWriter = *ppWriter;
  STsdb       *pTsdb = pWriter->pTsdb;

  // sync
  if (sync && taosFsyncFile(pWriter->pWriteH) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  // close
  if (taosCloseFile(&pWriter->pWriteH) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree(pWriter->aBuf[iBuf]);
  }
  taosMemoryFree(pWriter);

  *ppWriter = NULL;
  return code;

_err:
  tsdbError("vgId:%d, failed to close del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) {
  int32_t code = 0;
  int64_t size;
  int64_t n;

  // prepare
  size = sizeof(uint32_t);
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
    size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData));
  }
  size += sizeof(TSCKSUM);

  // alloc
  code = tRealloc(&pWriter->aBuf[0], size);
  if (code) goto _err;

  // build
  n = 0;
  n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
    n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData));
  }
  taosCalcChecksumAppend(0, pWriter->aBuf[0], size);

  ASSERT(n + sizeof(TSCKSUM) == size);

  // write
  n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  ASSERT(n == size);

  // update
  pDelIdx->offset = pWriter->fDel.size;
  pDelIdx->size = size;
  pWriter->fDel.size += size;

  return code;

_err:
  tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
  int32_t  code = 0;
  int64_t  size;
  int64_t  n;
  SDelIdx *pDelIdx;

  // prepare
  size = sizeof(uint32_t);
  for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
    size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx));
  }
  size += sizeof(TSCKSUM);

  // alloc
  code = tRealloc(&pWriter->aBuf[0], size);
  if (code) goto _err;

  // build
  n = 0;
  n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
  for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
    n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx));
  }
  taosCalcChecksumAppend(0, pWriter->aBuf[0], size);

  ASSERT(n + sizeof(TSCKSUM) == size);

  // write
  n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
1356 1357 1358 1359 1360
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
1361 1362 1363
  // update
  pWriter->fDel.offset = pWriter->fDel.size;
  pWriter->fDel.size += size;
H
Hongze Cheng 已提交
1364

H
Hongze Cheng 已提交
1365
  return code;
H
Hongze Cheng 已提交
1366

H
Hongze Cheng 已提交
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
_err:
  tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) {
  int32_t code = 0;
  char    hdr[TSDB_FHDR_SIZE];
  int64_t size = TSDB_FHDR_SIZE;
  int64_t n;

  // build
  memset(hdr, 0, size);
  tPutDelFile(hdr, &pWriter->fDel);
  taosCalcChecksumAppend(0, hdr, size);

  // seek
  if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) {
H
Hongze Cheng 已提交
1385 1386 1387 1388
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
1389 1390
  // write
  n = taosWriteFile(pWriter->pWriteH, hdr, size);
H
Hongze Cheng 已提交
1391 1392 1393 1394 1395 1396 1397 1398
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  return code;

_err:
H
Hongze Cheng 已提交
1399
  tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1400
  return code;
H
Hongze Cheng 已提交
1401
}
H
Hongze Cheng 已提交
1402 1403 1404 1405 1406
// SDelFReader ====================================================
struct SDelFReader {
  STsdb    *pTsdb;
  SDelFile  fDel;
  TdFilePtr pReadH;
H
Hongze Cheng 已提交
1407

H
Hongze Cheng 已提交
1408 1409
  uint8_t *aBuf[1];
};
H
Hongze Cheng 已提交
1410

H
Hongze Cheng 已提交
1411 1412 1413 1414 1415
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
  int32_t      code = 0;
  char         fname[TSDB_FILENAME_LEN];
  SDelFReader *pDelFReader;
  int64_t      n;
H
Hongze Cheng 已提交
1416

H
Hongze Cheng 已提交
1417 1418 1419
  // alloc
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
  if (pDelFReader == NULL) {
H
Hongze Cheng 已提交
1420
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433
    goto _err;
  }

  // open impl
  pDelFReader->pTsdb = pTsdb;
  pDelFReader->fDel = *pFile;

  tsdbDelFileName(pTsdb, pFile, fname);
  pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ);
  if (pDelFReader->pReadH == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
    taosMemoryFree(pDelFReader);
    goto _err;
H
Hongze Cheng 已提交
1434 1435 1436
  }

_exit:
H
Hongze Cheng 已提交
1437
  *ppReader = pDelFReader;
H
Hongze Cheng 已提交
1438 1439
  return code;

H
Hongze Cheng 已提交
1440 1441 1442 1443
_err:
  tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
1444 1445
}

H
Hongze Cheng 已提交
1446 1447 1448
int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
  int32_t      code = 0;
  SDelFReader *pReader = *ppReader;
H
Hongze Cheng 已提交
1449

H
Hongze Cheng 已提交
1450 1451 1452 1453 1454 1455 1456 1457 1458
  if (pReader) {
    if (taosCloseFile(&pReader->pReadH) < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _exit;
    }
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
      tFree(pReader->aBuf[iBuf]);
    }
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
1459
  }
H
Hongze Cheng 已提交
1460
  *ppReader = NULL;
H
Hongze Cheng 已提交
1461 1462 1463 1464 1465

_exit:
  return code;
}

H
Hongze Cheng 已提交
1466
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
H
Hongze Cheng 已提交
1467
  int32_t code = 0;
H
Hongze Cheng 已提交
1468 1469 1470
  int64_t offset = pDelIdx->offset;
  int64_t size = pDelIdx->size;
  int64_t n;
H
Hongze Cheng 已提交
1471

H
Hongze Cheng 已提交
1472
  taosArrayClear(aDelData);
H
Hongze Cheng 已提交
1473

H
Hongze Cheng 已提交
1474 1475 1476 1477 1478
  // seek
  if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
1479

H
Hongze Cheng 已提交
1480 1481 1482
  // alloc
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1483

H
Hongze Cheng 已提交
1484 1485 1486 1487 1488 1489 1490 1491 1492
  // read
  n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  } else if (n < size) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
  }
H
Hongze Cheng 已提交
1493

H
Hongze Cheng 已提交
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511
  // check
  if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _err;
  }

  // // decode
  n = 0;

  uint32_t delimiter;
  n += tGetU32(pReader->aBuf[0] + n, &delimiter);
  while (n < size - sizeof(TSCKSUM)) {
    SDelData delData;
    n += tGetDelData(pReader->aBuf[0] + n, &delData);

    if (taosArrayPush(aDelData, &delData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
H
Hongze Cheng 已提交
1512 1513 1514
    }
  }

H
Hongze Cheng 已提交
1515 1516 1517 1518 1519 1520
  ASSERT(n == size - sizeof(TSCKSUM));

  return code;

_err:
  tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1521 1522 1523
  return code;
}

H
Hongze Cheng 已提交
1524
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
H
Hongze Cheng 已提交
1525
  int32_t code = 0;
H
Hongze Cheng 已提交
1526 1527 1528
  int32_t n;
  int64_t offset = pReader->fDel.offset;
  int64_t size = pReader->fDel.size - offset;
H
Hongze Cheng 已提交
1529

H
Hongze Cheng 已提交
1530 1531 1532 1533
  taosArrayClear(aDelIdx);

  // seek
  if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
H
Hongze Cheng 已提交
1534
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
1535
    goto _err;
H
Hongze Cheng 已提交
1536 1537
  }

H
Hongze Cheng 已提交
1538 1539 1540 1541 1542 1543
  // alloc
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;

  // read
  n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size);
H
Hongze Cheng 已提交
1544 1545
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
1546 1547
    goto _err;
  } else if (n < size) {
H
Hongze Cheng 已提交
1548
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
1549
    goto _err;
H
Hongze Cheng 已提交
1550 1551
  }

H
Hongze Cheng 已提交
1552 1553
  // check
  if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
H
Hongze Cheng 已提交
1554
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
1555
    goto _err;
H
Hongze Cheng 已提交
1556 1557
  }

H
Hongze Cheng 已提交
1558 1559 1560 1561 1562
  // decode
  n = 0;
  uint32_t delimiter;
  n += tGetU32(pReader->aBuf[0] + n, &delimiter);
  ASSERT(delimiter == TSDB_FILE_DLMT);
H
Hongze Cheng 已提交
1563

H
Hongze Cheng 已提交
1564 1565
  while (n < size - sizeof(TSCKSUM)) {
    SDelIdx delIdx;
H
Hongze Cheng 已提交
1566

H
Hongze Cheng 已提交
1567
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
H
Hongze Cheng 已提交
1568

H
Hongze Cheng 已提交
1569 1570 1571 1572
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
1573 1574
  }

H
Hongze Cheng 已提交
1575
  ASSERT(n == size - sizeof(TSCKSUM));
H
Hongze Cheng 已提交
1576

H
Hongze Cheng 已提交
1577
  return code;
H
Hongze Cheng 已提交
1578

H
Hongze Cheng 已提交
1579 1580
_err:
  tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1581
  return code;
H
Hongze Cheng 已提交
1582
}
H
Hongze Cheng 已提交
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616

static int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) {
  int32_t code = 0;

  // alloc
  code = tRealloc(ppOut, size);
  if (code) goto _exit;

  // seek
  int64_t n = taosLSeekFile(pFD, offset, SEEK_SET);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _exit;
  }

  // read
  n = taosReadFile(pFD, *ppOut, size);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _exit;
  } else if (n < size) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _exit;
  }

  // check
  if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _exit;
  }

_exit:
  return code;
}