tsdbSnapshot.c 6.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18
// STsdbSnapReader ========================================
H
Hongze Cheng 已提交
19
struct STsdbSnapReader {
H
more  
Hongze Cheng 已提交
20 21 22 23
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
  // for data file
H
Hongze Cheng 已提交
24 25
  int8_t        dataDone;
  int32_t       fid;
H
more  
Hongze Cheng 已提交
26
  SDataFReader* pDataFReader;
H
Hongze Cheng 已提交
27 28
  int32_t       iBlockIdx;
  SArray*       aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
29 30
  SMapData      mBlock;     // SMapData<SBlock>
  SBlockData    blkData;
H
more  
Hongze Cheng 已提交
31
  // for del file
H
Hongze Cheng 已提交
32
  int8_t       delDone;
H
more  
Hongze Cheng 已提交
33
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
34 35 36
  int32_t      iDelIdx;
  SArray*      aDelIdx;   // SArray<SDelIdx>
  SArray*      aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
37 38
};

H
Hongze Cheng 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;

  if (pReader->pDataFReader == NULL) {
    code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, NULL);
    if (code) goto _err;

    code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
    if (code) goto _err;

    pReader->iBlockIdx = 0;
  }

  return code;

_err:
  tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t   code = 0;
  STsdb*    pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
62
  SDelFile* pDelFile = pTsdb->fs->cState->pDelFile;
H
Hongze Cheng 已提交
63 64 65 66 67 68 69 70 71 72 73

  if (pReader->pDelFReader == NULL) {
    if (pDelFile == NULL) {
      code = TSDB_CODE_VND_READ_END;
      goto _exit;
    }

    // open
    code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb, NULL);
    if (code) goto _err;

H
Hongze Cheng 已提交
74
    // read index
H
Hongze Cheng 已提交
75 76 77 78 79 80 81 82
    code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx, NULL);
    if (code) goto _err;

    pReader->iDelIdx = 0;
  }

  while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) {
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
H
Hongze Cheng 已提交
83
    int32_t  size = 0;
H
Hongze Cheng 已提交
84 85 86 87 88 89 90 91

    code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
    if (code) goto _err;

    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

      if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
H
Hongze Cheng 已提交
92
        size += tPutDelData(NULL, pDelData);
H
Hongze Cheng 已提交
93 94 95
      }
    }

H
Hongze Cheng 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
    if (size > 0) {
      code = tRealloc(ppData, sizeof(SSnapDataHdr) + size);  // TODO
      if (code) goto _err;

      // encode
      ((SSnapDataHdr*)(*ppData))->type = 1;
      ((SSnapDataHdr*)(*ppData))->size = size;

      for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
        SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

        if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
          // size += tPutDelData(NULL, pDelData); (todo)
        }
      }

H
Hongze Cheng 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124
      goto _exit;
    }
  }

  code = TSDB_CODE_VND_READ_END;

_exit:
  return code;

_err:
  tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
more  
Hongze Cheng 已提交
125

H
Hongze Cheng 已提交
126
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
127 128
  int32_t          code = 0;
  STsdbSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
129

H
more  
Hongze Cheng 已提交
130
  // alloc
H
Hongze Cheng 已提交
131
  pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
more  
Hongze Cheng 已提交
132 133 134 135 136 137 138 139
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pReader->pTsdb = pTsdb;
  pReader->sver = sver;
  pReader->ever = ever;

H
Hongze Cheng 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
  pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pReader->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pReader->mBlock = tMapDataInit();

  code = tBlockDataInit(&pReader->blkData);
  if (code) goto _err;

  pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pReader->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pReader->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

H
more  
Hongze Cheng 已提交
163 164 165 166 167 168 169
  *ppReader = pReader;
  return code;

_err:
  tsdbError("vgId:%d snapshot reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
170 171
}

H
Hongze Cheng 已提交
172
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
173 174 175 176 177 178 179 180 181 182
  int32_t          code = 0;
  STsdbSnapReader* pReader = *ppReader;

  taosArrayDestroy(pReader->aDelData);
  taosArrayDestroy(pReader->aDelIdx);
  if (pReader->pDelFReader) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }
  tBlockDataClear(&pReader->blkData);
  tMapDataClear(&pReader->mBlock);
H
Hongze Cheng 已提交
183
  taosArrayDestroy(pReader->aBlockIdx);
H
Hongze Cheng 已提交
184 185 186 187 188 189
  if (pReader->pDataFReader) {
    tsdbDataFReaderClose(&pReader->pDataFReader);
  }
  taosMemoryFree(pReader);
  *ppReader = NULL;

H
Hongze Cheng 已提交
190 191 192 193
  return code;
}

int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
H
more  
Hongze Cheng 已提交
194
  int32_t code = 0;
H
Hongze Cheng 已提交
195 196

  // read data file
H
Hongze Cheng 已提交
197 198 199 200 201 202 203 204 205 206 207 208
  if (!pReader->dataDone) {
    code = tsdbSnapReadData(pReader, ppData);
    if (code) {
      if (code == TSDB_CODE_VND_READ_END) {
        pReader->dataDone = 1;
      } else {
        goto _err;
      }
    } else {
      goto _exit;
    }
  }
H
Hongze Cheng 已提交
209 210

  // read del file
H
Hongze Cheng 已提交
211 212 213 214 215 216 217 218 219 220 221 222
  if (!pReader->delDone) {
    code = tsdbSnapReadDel(pReader, ppData);
    if (code) {
      if (code == TSDB_CODE_VND_READ_END) {
        pReader->delDone = 1;
      } else {
        goto _err;
      }
    } else {
      goto _exit;
    }
  }
H
Hongze Cheng 已提交
223

H
Hongze Cheng 已提交
224 225 226
  code = TSDB_CODE_VND_READ_END;

_exit:
H
more  
Hongze Cheng 已提交
227 228 229 230 231 232 233
  return code;

_err:
  tsdbError("vgId:%d snapshot read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
234 235 236 237 238 239 240 241 242 243 244
// STsdbSnapReader ========================================
struct STsdbSnapWriter {
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
  // for data file
  int32_t       iDFileSet;
  SDataFWriter* pDataFWriter;
  // for del file
  SDelFWriter* pDelFWriter;
};