tsdbSnapshot.c 7.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18
// STsdbSnapReader ========================================
H
Hongze Cheng 已提交
19
struct STsdbSnapReader {
H
more  
Hongze Cheng 已提交
20 21 22 23
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
  // for data file
H
Hongze Cheng 已提交
24 25
  int8_t        dataDone;
  int32_t       fid;
H
more  
Hongze Cheng 已提交
26
  SDataFReader* pDataFReader;
H
Hongze Cheng 已提交
27 28
  int32_t       iBlockIdx;
  SArray*       aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
29 30
  int32_t       iBlock;
  SMapData      mBlock;  // SMapData<SBlock>
H
Hongze Cheng 已提交
31
  SBlockData    blkData;
H
more  
Hongze Cheng 已提交
32
  // for del file
H
Hongze Cheng 已提交
33
  int8_t       delDone;
H
more  
Hongze Cheng 已提交
34
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
35 36 37
  int32_t      iDelIdx;
  SArray*      aDelIdx;   // SArray<SDelIdx>
  SArray*      aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
38 39
};

H
Hongze Cheng 已提交
40 41
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
42 43
  SBlock  block;
  SBlock* pBlock = &block;
H
Hongze Cheng 已提交
44 45 46 47 48 49 50 51 52 53 54

  if (pReader->pDataFReader == NULL) {
    code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, NULL);
    if (code) goto _err;

    code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
    if (code) goto _err;

    pReader->iBlockIdx = 0;
  }

H
Hongze Cheng 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
  while (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) {
    SBlockIdx* pBlockIdx = taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);

    pReader->iBlockIdx++;

    code = tsdbReadBlock(pReader->pDataFReader, pBlockIdx, &pReader->mBlock, NULL);
    if (code) goto _err;

    break;
  }

  while (pReader->iBlock < pReader->mBlock.nItem) {
    tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);

    pReader->iBlock++;

    if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) &&
        (pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) {
      // block in range, encode and return the data (todo)
      goto _exit;
    }
  }

_exit:
H
Hongze Cheng 已提交
79 80 81 82 83 84 85 86 87 88
  return code;

_err:
  tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t   code = 0;
  STsdb*    pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
89
  SDelFile* pDelFile = pTsdb->fs->cState->pDelFile;
H
Hongze Cheng 已提交
90 91 92 93 94 95 96 97 98 99 100

  if (pReader->pDelFReader == NULL) {
    if (pDelFile == NULL) {
      code = TSDB_CODE_VND_READ_END;
      goto _exit;
    }

    // open
    code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb, NULL);
    if (code) goto _err;

H
Hongze Cheng 已提交
101
    // read index
H
Hongze Cheng 已提交
102 103 104 105 106 107 108 109
    code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx, NULL);
    if (code) goto _err;

    pReader->iDelIdx = 0;
  }

  while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) {
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
H
Hongze Cheng 已提交
110
    int32_t  size = 0;
H
Hongze Cheng 已提交
111

H
Hongze Cheng 已提交
112 113
    pReader->iDelIdx++;

H
Hongze Cheng 已提交
114 115 116 117 118 119 120
    code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
    if (code) goto _err;

    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

      if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
H
Hongze Cheng 已提交
121
        size += tPutDelData(NULL, pDelData);
H
Hongze Cheng 已提交
122 123 124
      }
    }

H
Hongze Cheng 已提交
125
    if (size > 0) {
H
Hongze Cheng 已提交
126 127 128 129
      int64_t n = 0;

      size = size + sizeof(SSnapDataHdr) + sizeof(TABLEID);
      code = tRealloc(ppData, size);
H
Hongze Cheng 已提交
130 131
      if (code) goto _err;

H
Hongze Cheng 已提交
132 133 134 135 136 137 138 139 140 141 142
      // SSnapDataHdr
      SSnapDataHdr* pSnapDataHdr = (SSnapDataHdr*)(*ppData + n);
      pSnapDataHdr->type = 1;
      pSnapDataHdr->size = size;  // TODO: size here may incorrect
      n += sizeof(SSnapDataHdr);

      // TABLEID
      TABLEID* pId = (TABLEID*)(*ppData + n);
      pId->suid = pDelIdx->suid;
      pId->uid = pDelIdx->uid;
      n += sizeof(*pId);
H
Hongze Cheng 已提交
143

H
Hongze Cheng 已提交
144
      // DATA
H
Hongze Cheng 已提交
145 146 147 148
      for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
        SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

        if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
H
Hongze Cheng 已提交
149
          n += tPutDelData(*ppData + n, pDelData);
H
Hongze Cheng 已提交
150 151 152
        }
      }

H
Hongze Cheng 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165
      goto _exit;
    }
  }

  code = TSDB_CODE_VND_READ_END;

_exit:
  return code;

_err:
  tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
more  
Hongze Cheng 已提交
166

H
Hongze Cheng 已提交
167
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
168 169
  int32_t          code = 0;
  STsdbSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
170

H
more  
Hongze Cheng 已提交
171
  // alloc
H
Hongze Cheng 已提交
172
  pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
more  
Hongze Cheng 已提交
173 174 175 176 177 178 179 180
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pReader->pTsdb = pTsdb;
  pReader->sver = sver;
  pReader->ever = ever;

H
Hongze Cheng 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
  pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pReader->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pReader->mBlock = tMapDataInit();

  code = tBlockDataInit(&pReader->blkData);
  if (code) goto _err;

  pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pReader->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pReader->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

H
more  
Hongze Cheng 已提交
204 205 206 207 208 209 210
  *ppReader = pReader;
  return code;

_err:
  tsdbError("vgId:%d snapshot reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
211 212
}

H
Hongze Cheng 已提交
213
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
214 215 216 217 218 219 220 221 222 223
  int32_t          code = 0;
  STsdbSnapReader* pReader = *ppReader;

  taosArrayDestroy(pReader->aDelData);
  taosArrayDestroy(pReader->aDelIdx);
  if (pReader->pDelFReader) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }
  tBlockDataClear(&pReader->blkData);
  tMapDataClear(&pReader->mBlock);
H
Hongze Cheng 已提交
224
  taosArrayDestroy(pReader->aBlockIdx);
H
Hongze Cheng 已提交
225 226 227 228 229 230
  if (pReader->pDataFReader) {
    tsdbDataFReaderClose(&pReader->pDataFReader);
  }
  taosMemoryFree(pReader);
  *ppReader = NULL;

H
Hongze Cheng 已提交
231 232 233 234
  return code;
}

int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
H
more  
Hongze Cheng 已提交
235
  int32_t code = 0;
H
Hongze Cheng 已提交
236 237

  // read data file
H
Hongze Cheng 已提交
238 239 240 241 242 243 244 245 246 247 248 249
  if (!pReader->dataDone) {
    code = tsdbSnapReadData(pReader, ppData);
    if (code) {
      if (code == TSDB_CODE_VND_READ_END) {
        pReader->dataDone = 1;
      } else {
        goto _err;
      }
    } else {
      goto _exit;
    }
  }
H
Hongze Cheng 已提交
250 251

  // read del file
H
Hongze Cheng 已提交
252 253 254 255 256 257 258 259 260 261 262 263
  if (!pReader->delDone) {
    code = tsdbSnapReadDel(pReader, ppData);
    if (code) {
      if (code == TSDB_CODE_VND_READ_END) {
        pReader->delDone = 1;
      } else {
        goto _err;
      }
    } else {
      goto _exit;
    }
  }
H
Hongze Cheng 已提交
264

H
Hongze Cheng 已提交
265 266 267
  code = TSDB_CODE_VND_READ_END;

_exit:
H
more  
Hongze Cheng 已提交
268 269 270 271 272 273 274
  return code;

_err:
  tsdbError("vgId:%d snapshot read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
275 276 277 278 279 280 281 282 283 284 285
// STsdbSnapReader ========================================
struct STsdbSnapWriter {
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
  // for data file
  int32_t       iDFileSet;
  SDataFWriter* pDataFWriter;
  // for del file
  SDelFWriter* pDelFWriter;
};