You need to sign in or sign up before continuing.
tsdbSnapshot.c 9.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18
// STsdbSnapReader ========================================
H
Hongze Cheng 已提交
19
struct STsdbSnapReader {
H
more  
Hongze Cheng 已提交
20 21 22 23
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
  // for data file
H
Hongze Cheng 已提交
24 25
  int8_t        dataDone;
  int32_t       fid;
H
more  
Hongze Cheng 已提交
26
  SDataFReader* pDataFReader;
H
Hongze Cheng 已提交
27
  SArray*       aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
28 29
  int32_t       iBlockIdx;
  SBlockIdx*    pBlockIdx;
H
Hongze Cheng 已提交
30
  SMapData      mBlock;  // SMapData<SBlock>
H
Hongze Cheng 已提交
31
  int32_t       iBlock;
H
Hongze Cheng 已提交
32
  SBlockData    blkData;
H
more  
Hongze Cheng 已提交
33
  // for del file
H
Hongze Cheng 已提交
34
  int8_t       delDone;
H
more  
Hongze Cheng 已提交
35
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
36 37 38
  int32_t      iDelIdx;
  SArray*      aDelIdx;   // SArray<SDelIdx>
  SArray*      aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
39 40
};

H
Hongze Cheng 已提交
41 42 43
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;

H
Hongze Cheng 已提交
44 45 46
  while (true) {
    if (pReader->pDataFReader == NULL) {
      SDFileSet* pSet = NULL;
H
Hongze Cheng 已提交
47

H
Hongze Cheng 已提交
48 49 50 51 52
      // search the next data file set to read (todo)
      if (0 /* TODO */) {
        code = TSDB_CODE_VND_READ_END;
        goto _exit;
      }
H
Hongze Cheng 已提交
53

H
Hongze Cheng 已提交
54 55 56 57 58 59 60
      // open
      code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
      if (code) goto _err;

      // SBlockIdx
      code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
      if (code) goto _err;
H
Hongze Cheng 已提交
61

H
Hongze Cheng 已提交
62 63 64
      pReader->iBlockIdx = 0;
      pReader->pBlockIdx = NULL;
    }
H
Hongze Cheng 已提交
65

H
Hongze Cheng 已提交
66 67 68 69 70 71
    while (true) {
      if (pReader->pBlockIdx == NULL) {
        if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
          tsdbDataFReaderClose(&pReader->pDataFReader);
          break;
        }
H
Hongze Cheng 已提交
72

H
Hongze Cheng 已提交
73 74
        pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
        pReader->iBlockIdx++;
H
Hongze Cheng 已提交
75

H
Hongze Cheng 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
        // SBlock
        code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL);
        if (code) goto _err;

        pReader->iBlock = 0;
      }

      while (true) {
        SBlock  block;
        SBlock* pBlock = &block;

        if (pReader->iBlock >= pReader->mBlock.nItem) {
          pReader->pBlockIdx = NULL;
          break;
        }

        tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
        pReader->iBlock++;
H
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95 96 97
        if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) ||
            (pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) {
          // overlap (todo)
H
Hongze Cheng 已提交
98

H
Hongze Cheng 已提交
99 100
          code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->blkData, NULL, NULL);
          if (code) goto _err;
H
Hongze Cheng 已提交
101

H
Hongze Cheng 已提交
102 103 104
          goto _exit;
        }
      }
H
Hongze Cheng 已提交
105 106 107 108
    }
  }

_exit:
H
Hongze Cheng 已提交
109 110 111 112 113 114 115 116 117 118
  return code;

_err:
  tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t   code = 0;
  STsdb*    pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
119
  SDelFile* pDelFile = pTsdb->fs->cState->pDelFile;
H
Hongze Cheng 已提交
120 121 122 123 124 125 126 127 128 129 130

  if (pReader->pDelFReader == NULL) {
    if (pDelFile == NULL) {
      code = TSDB_CODE_VND_READ_END;
      goto _exit;
    }

    // open
    code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb, NULL);
    if (code) goto _err;

H
Hongze Cheng 已提交
131
    // read index
H
Hongze Cheng 已提交
132 133 134 135 136 137 138 139
    code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx, NULL);
    if (code) goto _err;

    pReader->iDelIdx = 0;
  }

  while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) {
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
H
Hongze Cheng 已提交
140
    int32_t  size = 0;
H
Hongze Cheng 已提交
141

H
Hongze Cheng 已提交
142 143
    pReader->iDelIdx++;

H
Hongze Cheng 已提交
144 145 146 147 148 149 150
    code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
    if (code) goto _err;

    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

      if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
H
Hongze Cheng 已提交
151
        size += tPutDelData(NULL, pDelData);
H
Hongze Cheng 已提交
152 153 154
      }
    }

H
Hongze Cheng 已提交
155
    if (size > 0) {
H
Hongze Cheng 已提交
156 157 158 159
      int64_t n = 0;

      size = size + sizeof(SSnapDataHdr) + sizeof(TABLEID);
      code = tRealloc(ppData, size);
H
Hongze Cheng 已提交
160 161
      if (code) goto _err;

H
Hongze Cheng 已提交
162 163 164 165 166 167 168 169 170 171 172
      // SSnapDataHdr
      SSnapDataHdr* pSnapDataHdr = (SSnapDataHdr*)(*ppData + n);
      pSnapDataHdr->type = 1;
      pSnapDataHdr->size = size;  // TODO: size here may incorrect
      n += sizeof(SSnapDataHdr);

      // TABLEID
      TABLEID* pId = (TABLEID*)(*ppData + n);
      pId->suid = pDelIdx->suid;
      pId->uid = pDelIdx->uid;
      n += sizeof(*pId);
H
Hongze Cheng 已提交
173

H
Hongze Cheng 已提交
174
      // DATA
H
Hongze Cheng 已提交
175 176 177 178
      for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
        SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

        if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
H
Hongze Cheng 已提交
179
          n += tPutDelData(*ppData + n, pDelData);
H
Hongze Cheng 已提交
180 181 182
        }
      }

H
Hongze Cheng 已提交
183 184 185 186 187
      goto _exit;
    }
  }

  code = TSDB_CODE_VND_READ_END;
H
Hongze Cheng 已提交
188
  tsdbDelFReaderClose(&pReader->pDelFReader);
H
Hongze Cheng 已提交
189 190 191 192 193 194 195 196

_exit:
  return code;

_err:
  tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
more  
Hongze Cheng 已提交
197

H
Hongze Cheng 已提交
198
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
199 200
  int32_t          code = 0;
  STsdbSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
201

H
more  
Hongze Cheng 已提交
202
  // alloc
H
Hongze Cheng 已提交
203
  pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
more  
Hongze Cheng 已提交
204 205 206 207 208 209 210 211
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pReader->pTsdb = pTsdb;
  pReader->sver = sver;
  pReader->ever = ever;

H
Hongze Cheng 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
  pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pReader->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pReader->mBlock = tMapDataInit();

  code = tBlockDataInit(&pReader->blkData);
  if (code) goto _err;

  pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pReader->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pReader->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

H
more  
Hongze Cheng 已提交
235 236 237 238 239 240 241
  *ppReader = pReader;
  return code;

_err:
  tsdbError("vgId:%d snapshot reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
242 243
}

H
Hongze Cheng 已提交
244
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
245 246 247 248 249 250 251 252 253 254
  int32_t          code = 0;
  STsdbSnapReader* pReader = *ppReader;

  taosArrayDestroy(pReader->aDelData);
  taosArrayDestroy(pReader->aDelIdx);
  if (pReader->pDelFReader) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }
  tBlockDataClear(&pReader->blkData);
  tMapDataClear(&pReader->mBlock);
H
Hongze Cheng 已提交
255
  taosArrayDestroy(pReader->aBlockIdx);
H
Hongze Cheng 已提交
256 257 258 259 260 261
  if (pReader->pDataFReader) {
    tsdbDataFReaderClose(&pReader->pDataFReader);
  }
  taosMemoryFree(pReader);
  *ppReader = NULL;

H
Hongze Cheng 已提交
262 263 264 265
  return code;
}

int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
H
more  
Hongze Cheng 已提交
266
  int32_t code = 0;
H
Hongze Cheng 已提交
267 268

  // read data file
H
Hongze Cheng 已提交
269 270 271 272 273 274 275 276 277 278 279 280
  if (!pReader->dataDone) {
    code = tsdbSnapReadData(pReader, ppData);
    if (code) {
      if (code == TSDB_CODE_VND_READ_END) {
        pReader->dataDone = 1;
      } else {
        goto _err;
      }
    } else {
      goto _exit;
    }
  }
H
Hongze Cheng 已提交
281 282

  // read del file
H
Hongze Cheng 已提交
283 284 285 286 287 288 289 290 291 292 293 294
  if (!pReader->delDone) {
    code = tsdbSnapReadDel(pReader, ppData);
    if (code) {
      if (code == TSDB_CODE_VND_READ_END) {
        pReader->delDone = 1;
      } else {
        goto _err;
      }
    } else {
      goto _exit;
    }
  }
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296 297 298
  code = TSDB_CODE_VND_READ_END;

_exit:
H
more  
Hongze Cheng 已提交
299 300 301 302 303 304 305
  return code;

_err:
  tsdbError("vgId:%d snapshot read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
306
// STsdbSnapWriter ========================================
H
Hongze Cheng 已提交
307 308 309 310 311 312 313 314 315
struct STsdbSnapWriter {
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
  // for data file
  int32_t       iDFileSet;
  SDataFWriter* pDataFWriter;
  // for del file
  SDelFWriter* pDelFWriter;
H
Hongze Cheng 已提交
316 317
};

H
Hongze Cheng 已提交
318
static int32_t tsdbSnapRollback(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
319 320 321 322 323
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
324
static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
325 326 327 328 329
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
  int32_t          code = 0;
  STsdbSnapWriter* pWriter = NULL;

  // alloc
  pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->pTsdb = pTsdb;
  pWriter->sver = sver;
  pWriter->ever = ever;

  *ppWriter = pWriter;
  return code;

_err:
  tsdbError("vgId:%d tsdb snapshot writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppWriter = NULL;
  return code;
}

H
Hongze Cheng 已提交
353
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
H
Hongze Cheng 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
  int32_t          code = 0;
  STsdbSnapWriter* pWriter = *ppWriter;

  if (rollback) {
    code = tsdbSnapRollback(pWriter);
    if (code) goto _err;
  } else {
    code = tsdbSnapCommit(pWriter);
    if (code) goto _err;
  }

  taosMemoryFree(pWriter);
  *ppWriter = NULL;

  return code;

_err:
  tsdbError("vgId:%d tsdb snapshot writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
Hongze Cheng 已提交
376 377 378 379
  int32_t code = 0;
  // TODO
  return code;
}