tsdbSttFileRW.c 27.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbSttFileRW.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
typedef struct {
  int64_t   prevFooter;
H
Hongze Cheng 已提交
20 21
  SFDataPtr sttBlkPtr[1];
  SFDataPtr statisBlkPtr[1];
H
Hongze Cheng 已提交
22
  SFDataPtr tombBlkPtr[1];
H
Hongze Cheng 已提交
23 24
  SFDataPtr rsrvd[2];
} SSttFooter;
H
Hongze Cheng 已提交
25

H
Hongze Cheng 已提交
26
// SSttFReader ============================================================
H
Hongze Cheng 已提交
27
struct SSttFileReader {
H
Hongze Cheng 已提交
28
  SSttFileReaderConfig config[1];
H
Hongze Cheng 已提交
29
  TSttSegReaderArray   readerArray[1];
H
Hongze Cheng 已提交
30
  STsdbFD             *fd;
H
Hongze Cheng 已提交
31
  uint8_t             *bufArr[5];
H
Hongze Cheng 已提交
32 33 34 35
};

struct SSttSegReader {
  SSttFileReader *reader;
H
Hongze Cheng 已提交
36
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
37 38 39
  struct {
    bool sttBlkLoaded;
    bool statisBlkLoaded;
H
Hongze Cheng 已提交
40
    bool tombBlkLoaded;
H
Hongze Cheng 已提交
41
  } ctx[1];
H
Hongze Cheng 已提交
42 43
  TSttBlkArray    sttBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
44
  TTombBlkArray   tombBlkArray[1];
H
Hongze Cheng 已提交
45 46
};

H
Hongze Cheng 已提交
47
// SSttFileReader
H
Hongze Cheng 已提交
48
static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) {
H
Hongze Cheng 已提交
49 50
  ASSERT(offset >= TSDB_FHDR_SIZE);

H
Hongze Cheng 已提交
51
  int32_t code = 0;
H
Hongze Cheng 已提交
52 53 54
  int32_t lino = 0;

  segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
H
Hongze Cheng 已提交
55
  if (!segReader[0]) return TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
56 57

  segReader[0]->reader = reader;
H
Hongze Cheng 已提交
58
  code = tsdbReadFile(reader->fd, offset, (uint8_t *)(segReader[0]->footer), sizeof(SSttFooter));
H
Hongze Cheng 已提交
59 60 61 62
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
63
    TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
64 65 66
    taosMemoryFree(segReader[0]);
    segReader[0] = NULL;
  }
H
Hongze Cheng 已提交
67 68 69
  return code;
}

H
Hongze Cheng 已提交
70
static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
H
Hongze Cheng 已提交
71
  if (reader[0]) {
H
Hongze Cheng 已提交
72 73
    TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
    TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL);
H
Hongze Cheng 已提交
74
    TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL);
H
Hongze Cheng 已提交
75 76
    taosMemoryFree(reader[0]);
    reader[0] = NULL;
H
Hongze Cheng 已提交
77 78
  }
  return 0;
H
Hongze Cheng 已提交
79 80
}

H
Hongze Cheng 已提交
81
int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) {
H
Hongze Cheng 已提交
82
  int32_t code = 0;
H
Hongze Cheng 已提交
83 84 85 86 87
  int32_t lino = 0;

  reader[0] = taosMemoryCalloc(1, sizeof(*reader[0]));
  if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
88
  reader[0]->config[0] = config[0];
H
Hongze Cheng 已提交
89 90 91
  if (reader[0]->config->bufArr == NULL) {
    reader[0]->config->bufArr = reader[0]->bufArr;
  }
H
Hongze Cheng 已提交
92 93

  // open file
H
Hongze Cheng 已提交
94 95 96 97 98 99 100 101 102
  if (fname) {
    code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    char fname1[TSDB_FILENAME_LEN];
    tsdbTFileName(config->tsdb, config->file, fname1);
    code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
103 104

  // open each segment reader
H
Hongze Cheng 已提交
105
  int64_t size = config->file->size;
H
Hongze Cheng 已提交
106
  while (size > 0) {
H
Hongze Cheng 已提交
107
    SSttSegReader *reader1;
H
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109
    code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SSttFooter), &reader1);
H
Hongze Cheng 已提交
110 111
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
112
    code = TARRAY2_APPEND(reader[0]->readerArray, reader1);
H
Hongze Cheng 已提交
113 114
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
115
    size = reader1->footer->prevFooter;
H
Hongze Cheng 已提交
116 117
  }

H
Hongze Cheng 已提交
118
  ASSERT(TARRAY2_SIZE(reader[0]->readerArray) == config->file->stt->nseg);
H
Hongze Cheng 已提交
119 120 121

_exit:
  if (code) {
H
Hongze Cheng 已提交
122
    TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
123
    tsdbSttFileReaderClose(reader);
H
Hongze Cheng 已提交
124
  }
H
Hongze Cheng 已提交
125 126 127
  return code;
}

H
Hongze Cheng 已提交
128
int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
H
Hongze Cheng 已提交
129
  if (reader[0]) {
H
Hongze Cheng 已提交
130 131 132
    for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) {
      tFree(reader[0]->bufArr[i]);
    }
H
Hongze Cheng 已提交
133
    tsdbCloseFile(&reader[0]->fd);
H
Hongze Cheng 已提交
134
    TARRAY2_DESTROY(reader[0]->readerArray, tsdbSttSegReaderClose);
H
Hongze Cheng 已提交
135 136 137
    taosMemoryFree(reader[0]);
    reader[0] = NULL;
  }
H
Hongze Cheng 已提交
138
  return 0;
H
Hongze Cheng 已提交
139 140
}

H
Hongze Cheng 已提交
141
int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray) {
H
Hongze Cheng 已提交
142
  readerArray[0] = reader->readerArray;
H
Hongze Cheng 已提交
143
  return 0;
H
Hongze Cheng 已提交
144 145
}

H
Hongze Cheng 已提交
146
// SSttFSegReader
H
Hongze Cheng 已提交
147
int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) {
H
Hongze Cheng 已提交
148
  if (!reader->ctx->statisBlkLoaded) {
H
Hongze Cheng 已提交
149
    if (reader->footer->statisBlkPtr->size > 0) {
H
Hongze Cheng 已提交
150
      ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0);
H
Hongze Cheng 已提交
151

H
Hongze Cheng 已提交
152
      int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk);
H
Hongze Cheng 已提交
153
      void   *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
154 155
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
156 157
      int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data,
                                  reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
158 159 160 161
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
162

H
Hongze Cheng 已提交
163
      TARRAY2_INIT_EX(reader->statisBlkArray, size, size, data);
H
Hongze Cheng 已提交
164
    } else {
H
Hongze Cheng 已提交
165
      TARRAY2_INIT(reader->statisBlkArray);
H
Hongze Cheng 已提交
166 167
    }

H
Hongze Cheng 已提交
168
    reader->ctx->statisBlkLoaded = true;
H
Hongze Cheng 已提交
169 170
  }

H
Hongze Cheng 已提交
171
  statisBlkArray[0] = reader->statisBlkArray;
H
Hongze Cheng 已提交
172
  return 0;
H
Hongze Cheng 已提交
173 174
}

H
Hongze Cheng 已提交
175 176 177 178
int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **tombBlkArray) {
  if (!reader->ctx->tombBlkLoaded) {
    if (reader->footer->tombBlkPtr->size > 0) {
      ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0);
H
Hongze Cheng 已提交
179

H
Hongze Cheng 已提交
180 181
      int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk);
      void   *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
182 183
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
184
      int32_t code =
H
Hongze Cheng 已提交
185
          tsdbReadFile(reader->reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
186 187 188 189
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
190

H
Hongze Cheng 已提交
191
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
H
Hongze Cheng 已提交
192
    } else {
H
Hongze Cheng 已提交
193
      TARRAY2_INIT(reader->tombBlkArray);
H
Hongze Cheng 已提交
194 195
    }

H
Hongze Cheng 已提交
196
    reader->ctx->tombBlkLoaded = true;
H
Hongze Cheng 已提交
197 198
  }

H
Hongze Cheng 已提交
199
  tombBlkArray[0] = reader->tombBlkArray;
H
Hongze Cheng 已提交
200 201 202
  return 0;
}

H
Hongze Cheng 已提交
203
int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) {
H
Hongze Cheng 已提交
204
  if (!reader->ctx->sttBlkLoaded) {
H
Hongze Cheng 已提交
205 206
    if (reader->footer->sttBlkPtr->size > 0) {
      ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
H
Hongze Cheng 已提交
207

H
Hongze Cheng 已提交
208 209
      int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk);
      void   *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
210 211
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
212 213
      int32_t code =
          tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
214 215 216 217
      if (code) {
        taosMemoryFree(data);
        return code;
      }
H
Hongze Cheng 已提交
218

H
Hongze Cheng 已提交
219
      TARRAY2_INIT_EX(reader->sttBlkArray, size, size, data);
H
Hongze Cheng 已提交
220
    } else {
H
Hongze Cheng 已提交
221
      TARRAY2_INIT(reader->sttBlkArray);
H
Hongze Cheng 已提交
222 223
    }

H
Hongze Cheng 已提交
224
    reader->ctx->sttBlkLoaded = true;
H
Hongze Cheng 已提交
225 226
  }

H
Hongze Cheng 已提交
227
  sttBlkArray[0] = reader->sttBlkArray;
H
Hongze Cheng 已提交
228
  return 0;
H
Hongze Cheng 已提交
229 230
}

H
Hongze Cheng 已提交
231
int32_t tsdbSttFileReadDataBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
H
Hongze Cheng 已提交
232
  int32_t code = 0;
H
Hongze Cheng 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
  int32_t lino = 0;

  code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
  TSDB_CHECK_CODE(code, lino, _exit);

  code =
      tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tDecmprBlockData(reader->reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData,
                          &reader->reader->config->bufArr[1]);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
  }
H
Hongze Cheng 已提交
250 251 252
  return code;
}

H
Hongze Cheng 已提交
253
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) {
H
Hongze Cheng 已提交
254
  int32_t code = 0;
H
Hongze Cheng 已提交
255 256
  int32_t lino = 0;

H
Hongze Cheng 已提交
257
  code = tRealloc(&reader->reader->config->bufArr[0], tombBlk->dp->size);
H
Hongze Cheng 已提交
258 259
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
260
  code = tsdbReadFile(reader->reader->fd, tombBlk->dp->offset, reader->reader->config->bufArr[0], tombBlk->dp->size);
H
Hongze Cheng 已提交
261 262 263
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
H
Hongze Cheng 已提交
264
  tTombBlockClear(dData);
H
Hongze Cheng 已提交
265
  for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) {
H
Hongze Cheng 已提交
266
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT,
H
Hongze Cheng 已提交
267
                          tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,
H
Hongze Cheng 已提交
268
                          &reader->reader->config->bufArr[2]);
H
Hongze Cheng 已提交
269 270
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
271 272
    code = TARRAY2_APPEND_BATCH(&dData->dataArr[i], reader->reader->config->bufArr[1], tombBlk->numRec);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
273

H
Hongze Cheng 已提交
274
    size += tombBlk->size[i];
H
Hongze Cheng 已提交
275 276
  }

H
Hongze Cheng 已提交
277
  ASSERT(size == tombBlk->dp->size);
H
Hongze Cheng 已提交
278 279
_exit:
  if (code) {
H
Hongze Cheng 已提交
280
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
281
  }
H
Hongze Cheng 已提交
282 283 284
  return code;
}

H
Hongze Cheng 已提交
285
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData) {
H
Hongze Cheng 已提交
286
  int32_t code = 0;
H
Hongze Cheng 已提交
287 288 289
  int32_t lino = 0;

  tStatisBlockClear(sData);
H
Hongze Cheng 已提交
290 291

  code = tRealloc(&reader->reader->config->bufArr[0], statisBlk->dp->size);
H
Hongze Cheng 已提交
292 293
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
294 295
  code =
      tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->bufArr[0], statisBlk->dp->size);
H
Hongze Cheng 已提交
296 297 298
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
H
Hongze Cheng 已提交
299
  for (int32_t i = 0; i < ARRAY_SIZE(sData->dataArr); ++i) {
H
Hongze Cheng 已提交
300
    code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT,
H
Hongze Cheng 已提交
301
                          statisBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec,
H
Hongze Cheng 已提交
302
                          &reader->reader->config->bufArr[2]);
H
Hongze Cheng 已提交
303 304
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
305 306
    code = TARRAY2_APPEND_BATCH(sData->dataArr + i, reader->reader->config->bufArr[1], statisBlk->numRec);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
307

H
Hongze Cheng 已提交
308 309 310 311 312 313 314
    size += statisBlk->size[i];
  }

  ASSERT(size == statisBlk->dp->size);

_exit:
  if (code) {
H
Hongze Cheng 已提交
315
    TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
316
  }
H
Hongze Cheng 已提交
317 318 319
  return code;
}

H
Hongze Cheng 已提交
320
// SSttFWriter ============================================================
H
Hongze Cheng 已提交
321
struct SSttFileWriter {
H
Hongze Cheng 已提交
322
  SSttFileWriterConfig config[1];
H
Hongze Cheng 已提交
323
  struct {
H
Hongze Cheng 已提交
324 325
    bool    opened;
    TABLEID tbid[1];
H
Hongze Cheng 已提交
326
  } ctx[1];
H
Hongze Cheng 已提交
327
  // file
H
Hongze Cheng 已提交
328
  STFile file[1];
H
Hongze Cheng 已提交
329
  // data
H
Hongze Cheng 已提交
330 331
  TSttBlkArray    sttBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
332
  TTombBlkArray   tombBlkArray[1];
H
Hongze Cheng 已提交
333
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
334 335
  SBlockData      bData[1];
  STbStatisBlock  sData[1];
H
Hongze Cheng 已提交
336
  STombBlock      tData[1];
H
Hongze Cheng 已提交
337
  // helper data
H
Hongze Cheng 已提交
338 339
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
340
  uint8_t *bufArr[5];
H
Hongze Cheng 已提交
341
  STsdbFD *fd;
H
Hongze Cheng 已提交
342
};
H
Hongze Cheng 已提交
343

H
Hongze Cheng 已提交
344 345 346
static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
  if (writer->bData->nRow == 0) return 0;

H
Hongze Cheng 已提交
347
  int32_t code = 0;
H
Hongze Cheng 已提交
348
  int32_t lino = 0;
H
Hongze Cheng 已提交
349 350 351 352 353 354 355 356 357 358 359 360

  SSttBlk sttBlk[1] = {{
      .suid = writer->bData->suid,
      .minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0],
      .maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1],
      .minKey = writer->bData->aTSKEY[0],
      .maxKey = writer->bData->aTSKEY[0],
      .minVer = writer->bData->aVersion[0],
      .maxVer = writer->bData->aVersion[0],
      .nRow = writer->bData->nRow,
  }};

H
Hongze Cheng 已提交
361 362 363 364 365 366 367
  for (int32_t iRow = 1; iRow < writer->bData->nRow; iRow++) {
    if (sttBlk->minKey > writer->bData->aTSKEY[iRow]) sttBlk->minKey = writer->bData->aTSKEY[iRow];
    if (sttBlk->maxKey < writer->bData->aTSKEY[iRow]) sttBlk->maxKey = writer->bData->aTSKEY[iRow];
    if (sttBlk->minVer > writer->bData->aVersion[iRow]) sttBlk->minVer = writer->bData->aVersion[iRow];
    if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow];
  }

H
Hongze Cheng 已提交
368 369
  int32_t sizeArr[5] = {0};
  code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr);
H
Hongze Cheng 已提交
370
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
371

H
Hongze Cheng 已提交
372
  sttBlk->bInfo.offset = writer->file->size;
H
Hongze Cheng 已提交
373 374
  sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3];
  sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey;
H
Hongze Cheng 已提交
375

H
Hongze Cheng 已提交
376
  for (int32_t i = 3; i >= 0; i--) {
H
Hongze Cheng 已提交
377 378
    if (sizeArr[i]) {
      code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], sizeArr[i]);
H
Hongze Cheng 已提交
379
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
380
      writer->file->size += sizeArr[i];
H
Hongze Cheng 已提交
381 382 383
    }
  }

H
Hongze Cheng 已提交
384
  code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk);
H
Hongze Cheng 已提交
385
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
386

H
Hongze Cheng 已提交
387 388
  tBlockDataClear(writer->bData);

H
Hongze Cheng 已提交
389 390
_exit:
  if (code) {
H
Hongze Cheng 已提交
391
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
392
  }
H
Hongze Cheng 已提交
393 394 395
  return code;
}

H
Hongze Cheng 已提交
396
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
397
  if (STATIS_BLOCK_SIZE(writer->sData) == 0) return 0;
H
Hongze Cheng 已提交
398

H
Hongze Cheng 已提交
399
  int32_t code = 0;
H
Hongze Cheng 已提交
400
  int32_t lino = 0;
H
Hongze Cheng 已提交
401

H
Hongze Cheng 已提交
402
  SStatisBlk statisBlk[1] = {{
H
Hongze Cheng 已提交
403 404 405 406 407
      .dp[0] =
          {
              .offset = writer->file->size,
              .size = 0,
          },
H
Hongze Cheng 已提交
408 409 410 411 412 413 414 415 416 417
      .minTbid =
          {
              .suid = TARRAY2_FIRST(writer->sData->suid),
              .uid = TARRAY2_FIRST(writer->sData->uid),
          },
      .maxTbid =
          {
              .suid = TARRAY2_LAST(writer->sData->suid),
              .uid = TARRAY2_LAST(writer->sData->uid),
          },
H
Hongze Cheng 已提交
418 419
      .minVer = TARRAY2_FIRST(writer->sData->minVer),
      .maxVer = TARRAY2_FIRST(writer->sData->maxVer),
H
Hongze Cheng 已提交
420 421
      .numRec = STATIS_BLOCK_SIZE(writer->sData),
      .cmprAlg = writer->config->cmprAlg,
H
Hongze Cheng 已提交
422 423
  }};

H
Hongze Cheng 已提交
424 425 426 427
  for (int32_t i = 1; i < STATIS_BLOCK_SIZE(writer->sData); i++) {
    statisBlk->minVer = TMIN(statisBlk->minVer, TARRAY2_GET(writer->sData->minVer, i));
    statisBlk->maxVer = TMAX(statisBlk->maxVer, TARRAY2_GET(writer->sData->maxVer, i));
  }
H
Hongze Cheng 已提交
428

H
Hongze Cheng 已提交
429
  for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) {
H
Hongze Cheng 已提交
430
    int32_t size;
H
Hongze Cheng 已提交
431
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->sData->dataArr + i),
H
Hongze Cheng 已提交
432
                        TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg,
H
Hongze Cheng 已提交
433
                        &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
H
Hongze Cheng 已提交
434
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
435

H
Hongze Cheng 已提交
436
    code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], size);
H
Hongze Cheng 已提交
437 438
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
439 440
    statisBlk->size[i] = size;
    statisBlk->dp->size += size;
H
Hongze Cheng 已提交
441
    writer->file->size += size;
H
Hongze Cheng 已提交
442
  }
H
Hongze Cheng 已提交
443

H
Hongze Cheng 已提交
444
  code = TARRAY2_APPEND_PTR(writer->statisBlkArray, statisBlk);
H
Hongze Cheng 已提交
445
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
446

H
Hongze Cheng 已提交
447 448
  tStatisBlockClear(writer->sData);

H
Hongze Cheng 已提交
449 450
_exit:
  if (code) {
H
Hongze Cheng 已提交
451
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
452
  }
H
Hongze Cheng 已提交
453 454 455
  return code;
}

H
Hongze Cheng 已提交
456
static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
457
  if (TOMB_BLOCK_SIZE(writer->tData) == 0) return 0;
H
Hongze Cheng 已提交
458

H
Hongze Cheng 已提交
459
  int32_t code = 0;
H
Hongze Cheng 已提交
460
  int32_t lino = 0;
H
Hongze Cheng 已提交
461

H
Hongze Cheng 已提交
462
  STombBlk tombBlk[1] = {{
H
Hongze Cheng 已提交
463 464 465 466 467
      .dp[0] =
          {
              .offset = writer->file->size,
              .size = 0,
          },
H
Hongze Cheng 已提交
468
      .minTbid =
H
Hongze Cheng 已提交
469
          {
H
Hongze Cheng 已提交
470 471
              .suid = TARRAY2_FIRST(writer->tData->suid),
              .uid = TARRAY2_FIRST(writer->tData->uid),
H
Hongze Cheng 已提交
472
          },
H
Hongze Cheng 已提交
473
      .maxTbid =
H
Hongze Cheng 已提交
474
          {
H
Hongze Cheng 已提交
475 476
              .suid = TARRAY2_LAST(writer->tData->suid),
              .uid = TARRAY2_LAST(writer->tData->uid),
H
Hongze Cheng 已提交
477
          },
H
Hongze Cheng 已提交
478 479
      .minVer = TARRAY2_FIRST(writer->tData->version),
      .maxVer = TARRAY2_FIRST(writer->tData->version),
H
Hongze Cheng 已提交
480 481
      .numRec = TOMB_BLOCK_SIZE(writer->tData),
      .cmprAlg = writer->config->cmprAlg,
H
Hongze Cheng 已提交
482
  }};
H
Hongze Cheng 已提交
483

H
Hongze Cheng 已提交
484 485 486
  for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tData); i++) {
    tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->tData->version, i));
    tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->tData->version, i));
H
Hongze Cheng 已提交
487 488
  }

H
Hongze Cheng 已提交
489
  for (int32_t i = 0; i < ARRAY_SIZE(writer->tData->dataArr); i++) {
H
Hongze Cheng 已提交
490
    int32_t size;
H
Hongze Cheng 已提交
491
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tData->dataArr[i]),
H
Hongze Cheng 已提交
492
                        TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg,
H
Hongze Cheng 已提交
493
                        &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
H
Hongze Cheng 已提交
494
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
495

H
Hongze Cheng 已提交
496
    code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], size);
H
Hongze Cheng 已提交
497 498
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
499
    tombBlk->size[i] = size;
H
Hongze Cheng 已提交
500
    tombBlk->dp->size += size;
H
Hongze Cheng 已提交
501
    writer->file->size += size;
H
Hongze Cheng 已提交
502 503
  }

H
Hongze Cheng 已提交
504
  code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk);
H
Hongze Cheng 已提交
505
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
506

H
Hongze Cheng 已提交
507
  tTombBlockClear(writer->tData);
H
Hongze Cheng 已提交
508

H
Hongze Cheng 已提交
509 510
_exit:
  if (code) {
H
Hongze Cheng 已提交
511
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
512
  }
H
Hongze Cheng 已提交
513 514 515
  return code;
}

H
Hongze Cheng 已提交
516
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
517
  int32_t code = 0;
H
Hongze Cheng 已提交
518 519
  int32_t lino;

H
Hongze Cheng 已提交
520
  writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray);
H
Hongze Cheng 已提交
521
  if (writer->footer->sttBlkPtr->size) {
H
Hongze Cheng 已提交
522
    writer->footer->sttBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
523
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
H
Hongze Cheng 已提交
524
                         writer->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
525
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
526
    writer->file->size += writer->footer->sttBlkPtr->size;
H
Hongze Cheng 已提交
527 528 529 530
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
531
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
532
  }
H
Hongze Cheng 已提交
533 534 535
  return code;
}

H
Hongze Cheng 已提交
536
static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
537
  int32_t code = 0;
H
Hongze Cheng 已提交
538
  int32_t lino;
H
Hongze Cheng 已提交
539

H
Hongze Cheng 已提交
540
  writer->footer->statisBlkPtr->size = TARRAY2_DATA_LEN(writer->statisBlkArray);
H
Hongze Cheng 已提交
541
  if (writer->footer->statisBlkPtr->size) {
H
Hongze Cheng 已提交
542
    writer->footer->statisBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
543
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray),
H
Hongze Cheng 已提交
544
                         writer->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
545
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
546
    writer->file->size += writer->footer->statisBlkPtr->size;
H
Hongze Cheng 已提交
547 548 549 550
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
551
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
552
  }
H
Hongze Cheng 已提交
553 554 555
  return code;
}

H
Hongze Cheng 已提交
556
static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
557
  int32_t code = 0;
H
Hongze Cheng 已提交
558
  int32_t lino = 0;
H
Hongze Cheng 已提交
559

H
Hongze Cheng 已提交
560 561
  writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
  if (writer->footer->tombBlkPtr->size) {
H
Hongze Cheng 已提交
562
    writer->footer->tombBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
563 564
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray),
                         writer->footer->tombBlkPtr->size);
H
Hongze Cheng 已提交
565
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
566
    writer->file->size += writer->footer->tombBlkPtr->size;
H
Hongze Cheng 已提交
567 568 569 570
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
571
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
572 573 574 575
  }
  return code;
}

H
Hongze Cheng 已提交
576
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
577 578
  writer->footer->prevFooter = writer->config->file.size;
  int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer));
H
Hongze Cheng 已提交
579
  if (code) return code;
H
Hongze Cheng 已提交
580
  writer->file->size += sizeof(writer->footer);
H
Hongze Cheng 已提交
581
  return 0;
H
Hongze Cheng 已提交
582 583
}

H
Hongze Cheng 已提交
584
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
585
  int32_t code = 0;
H
Hongze Cheng 已提交
586
  int32_t lino = 0;
H
Hongze Cheng 已提交
587

H
Hongze Cheng 已提交
588
  // set
H
Hongze Cheng 已提交
589 590 591 592
  writer->file[0] = writer->config->file;
  writer->file->stt->nseg++;
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
H
Hongze Cheng 已提交
593
  if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr;
H
Hongze Cheng 已提交
594

H
Hongze Cheng 已提交
595 596 597
  // open file
  int32_t flag;
  char    fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
598

H
Hongze Cheng 已提交
599
  if (writer->file->size > 0) {
H
Hongze Cheng 已提交
600 601 602
    flag = TD_FILE_READ | TD_FILE_WRITE;
  } else {
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
603
  }
H
Hongze Cheng 已提交
604

H
Hongze Cheng 已提交
605
  tsdbTFileName(writer->config->tsdb, writer->file, fname);
H
Hongze Cheng 已提交
606
  code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
H
Hongze Cheng 已提交
607 608
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
609
  if (writer->file->size == 0) {
H
Hongze Cheng 已提交
610 611 612
    uint8_t hdr[TSDB_FHDR_SIZE] = {0};
    code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr));
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
613
    writer->file->size += sizeof(hdr);
H
Hongze Cheng 已提交
614
  }
H
Hongze Cheng 已提交
615

H
Hongze Cheng 已提交
616 617
  writer->ctx->opened = true;

H
Hongze Cheng 已提交
618
_exit:
H
Hongze Cheng 已提交
619
  if (code) {
H
Hongze Cheng 已提交
620
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
621
  }
H
Hongze Cheng 已提交
622
  return code;
H
Hongze Cheng 已提交
623 624
}

H
Hongze Cheng 已提交
625
static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
626
  ASSERT(writer->fd == NULL);
H
Hongze Cheng 已提交
627

H
Hongze Cheng 已提交
628
  for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) {
H
Hongze Cheng 已提交
629
    tFree(writer->bufArr[i]);
H
Hongze Cheng 已提交
630 631 632
  }
  tDestroyTSchema(writer->skmRow->pTSchema);
  tDestroyTSchema(writer->skmTb->pTSchema);
H
Hongze Cheng 已提交
633
  tTombBlockDestroy(writer->tData);
H
Hongze Cheng 已提交
634
  tStatisBlockDestroy(writer->sData);
H
Hongze Cheng 已提交
635
  tBlockDataDestroy(writer->bData);
H
Hongze Cheng 已提交
636 637 638
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
  TARRAY2_DESTROY(writer->statisBlkArray, NULL);
  TARRAY2_DESTROY(writer->sttBlkArray, NULL);
H
Hongze Cheng 已提交
639
}
H
Hongze Cheng 已提交
640

H
Hongze Cheng 已提交
641 642 643 644
static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) {
  // TODO
  return 0;
}
H
Hongze Cheng 已提交
645

H
Hongze Cheng 已提交
646
static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *opArray) {
H
Hongze Cheng 已提交
647 648
  int32_t lino;
  int32_t code;
H
Hongze Cheng 已提交
649

H
Hongze Cheng 已提交
650 651
  code = tsdbSttFileDoWriteTSDataBlock(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
652

H
Hongze Cheng 已提交
653
  code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
654 655
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
656
  code = tsdbSttFileDoWriteTombBlock(writer);
H
Hongze Cheng 已提交
657
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
658

H
Hongze Cheng 已提交
659 660
  code = tsdbSttFileDoWriteSttBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
661

H
Hongze Cheng 已提交
662 663
  code = tsdbSttFileDoWriteStatisBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
664

H
Hongze Cheng 已提交
665
  code = tsdbSttFileDoWriteTombBlk(writer);
H
Hongze Cheng 已提交
666
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
667

H
Hongze Cheng 已提交
668 669
  code = tsdbSttFileDoWriteFooter(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
670

H
Hongze Cheng 已提交
671
  code = tsdbSttFileDoUpdateHeader(writer);
H
Hongze Cheng 已提交
672 673
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
674
  code = tsdbFsyncFile(writer->fd);
H
Hongze Cheng 已提交
675 676
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
677
  tsdbCloseFile(&writer->fd);
H
Hongze Cheng 已提交
678

H
Hongze Cheng 已提交
679
  ASSERT(writer->config->file.size < writer->file->size);
H
Hongze Cheng 已提交
680 681 682 683 684 685 686 687 688 689 690 691 692 693 694
  STFileOp op;
  if (writer->config->file.size == 0) {
    op = (STFileOp){
        .optype = TSDB_FOP_CREATE,
        .fid = writer->config->file.fid,
        .nf = writer->file[0],
    };
  } else {
    op = (STFileOp){
        .optype = TSDB_FOP_MODIFY,
        .fid = writer->config->file.fid,
        .of = writer->config->file,
        .nf = writer->file[0],
    };
  }
H
Hongze Cheng 已提交
695 696 697

  code = TARRAY2_APPEND(opArray, op);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
698

H
Hongze Cheng 已提交
699 700
_exit:
  if (code) {
H
Hongze Cheng 已提交
701
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
702
  }
H
Hongze Cheng 已提交
703
  return code;
H
Hongze Cheng 已提交
704
}
H
Hongze Cheng 已提交
705

H
Hongze Cheng 已提交
706
static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
707
  if (writer->config->file.size) {  // truncate the file to the original size
H
Hongze Cheng 已提交
708 709
    ASSERT(writer->config->file.size <= writer->file->size);
    if (writer->config->file.size < writer->file->size) {
H
Hongze Cheng 已提交
710
      taosFtruncateFile(writer->fd->pFD, writer->config->file.size);
H
Hongze Cheng 已提交
711 712 713
      tsdbCloseFile(&writer->fd);
    }
  } else {  // remove the file
H
Hongze Cheng 已提交
714 715
    char fname[TSDB_FILENAME_LEN];
    tsdbTFileName(writer->config->tsdb, &writer->config->file, fname);
H
Hongze Cheng 已提交
716 717 718
    tsdbCloseFile(&writer->fd);
    taosRemoveFile(fname);
  }
H
Hongze Cheng 已提交
719 720
  return 0;
}
H
Hongze Cheng 已提交
721

H
Hongze Cheng 已提交
722
int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) {
H
Hongze Cheng 已提交
723
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
H
Hongze Cheng 已提交
724 725
  if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
726
  writer[0]->config[0] = config[0];
H
Hongze Cheng 已提交
727
  writer[0]->ctx->opened = false;
H
Hongze Cheng 已提交
728 729 730
  return 0;
}

H
Hongze Cheng 已提交
731
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray) {
H
Hongze Cheng 已提交
732
  int32_t code = 0;
H
Hongze Cheng 已提交
733
  int32_t lino = 0;
H
Hongze Cheng 已提交
734

H
Hongze Cheng 已提交
735
  if (writer[0]->ctx->opened) {
H
Hongze Cheng 已提交
736 737
    if (abort) {
      code = tsdbSttFWriterCloseAbort(writer[0]);
H
Hongze Cheng 已提交
738
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
739
    } else {
H
Hongze Cheng 已提交
740
      code = tsdbSttFWriterCloseCommit(writer[0], opArray);
H
Hongze Cheng 已提交
741
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
742
    }
H
Hongze Cheng 已提交
743
    tsdbSttFWriterDoClose(writer[0]);
H
Hongze Cheng 已提交
744
  }
H
Hongze Cheng 已提交
745 746
  taosMemoryFree(writer[0]);
  writer[0] = NULL;
H
Hongze Cheng 已提交
747 748 749

_exit:
  if (code) {
H
Hongze Cheng 已提交
750
    TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
751
  }
H
Hongze Cheng 已提交
752
  return code;
H
Hongze Cheng 已提交
753 754
}

H
Hongze Cheng 已提交
755
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
H
Hongze Cheng 已提交
756
  int32_t code = 0;
H
Hongze Cheng 已提交
757
  int32_t lino = 0;
H
Hongze Cheng 已提交
758

H
Hongze Cheng 已提交
759
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
760 761 762
    code = tsdbSttFWriterDoOpen(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
763

H
Hongze Cheng 已提交
764 765
  TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
  if (!TABLE_SAME_SCHEMA(row->suid, row->uid, writer->ctx->tbid->suid, writer->ctx->tbid->uid)) {
H
Hongze Cheng 已提交
766 767
    code = tsdbSttFileDoWriteTSDataBlock(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
768

H
Hongze Cheng 已提交
769 770 771 772 773 774 775 776 777 778 779 780
    code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb);
    TSDB_CHECK_CODE(code, lino, _exit);

    TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid};
    code = tBlockDataInit(writer->bData, &id, writer->config->skmTb->pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  if (writer->ctx->tbid->uid != row->uid) {
    writer->ctx->tbid->suid = row->suid;
    writer->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
781
    if (STATIS_BLOCK_SIZE(writer->sData) >= writer->config->maxRow) {
H
Hongze Cheng 已提交
782
      code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
783
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
784 785
    }

H
Hongze Cheng 已提交
786
    STbStatisRecord record = {
H
Hongze Cheng 已提交
787 788
        .suid = row->suid,
        .uid = row->uid,
H
Hongze Cheng 已提交
789 790 791 792
        .firstKey = key->ts,
        .lastKey = key->ts,
        .minVer = key->version,
        .maxVer = key->version,
H
Hongze Cheng 已提交
793
        .count = 1,
H
Hongze Cheng 已提交
794 795
    };
    code = tStatisBlockPut(writer->sData, &record);
H
Hongze Cheng 已提交
796
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
797
  } else {
H
Hongze Cheng 已提交
798 799
    ASSERT(key->ts >= TARRAY2_LAST(writer->sData->lastKey));

H
Hongze Cheng 已提交
800 801 802 803 804 805
    TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version);
    TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version);
    if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) {
      TARRAY2_LAST(writer->sData->count)++;
      TARRAY2_LAST(writer->sData->lastKey) = key->ts;
    }
H
Hongze Cheng 已提交
806 807
  }

H
Hongze Cheng 已提交
808
  if (row->row.type == TSDBROW_ROW_FMT) {
H
Hongze Cheng 已提交
809 810
    code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid,  //
                            TSDBROW_SVERSION(&row->row), writer->config->skmRow);
H
Hongze Cheng 已提交
811
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
812 813
  }

H
Hongze Cheng 已提交
814
  // row to col conversion
H
Hongze Cheng 已提交
815 816 817 818 819 820 821 822 823
  if (key->version <= writer->config->compactVersion                       //
      && writer->bData->nRow > 0                                           //
      && (writer->bData->uid                                               //
              ? writer->bData->uid                                         //
              : writer->bData->aUid[writer->bData->nRow - 1]) == row->uid  //
      && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts         //
  ) {
    code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
824 825 826 827 828
  } else {
    if (writer->bData->nRow >= writer->config->maxRow) {
      code = tsdbSttFileDoWriteTSDataBlock(writer);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
829

H
Hongze Cheng 已提交
830
    code = tBlockDataAppendRow(writer->bData, &row->row, writer->config->skmRow->pTSchema, row->uid);
H
Hongze Cheng 已提交
831
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
832 833 834 835
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
836
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
837
  }
H
Hongze Cheng 已提交
838
  return code;
H
Hongze Cheng 已提交
839 840
}

H
Hongze Cheng 已提交
841
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) {
H
Hongze Cheng 已提交
842 843 844
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
845
  // TODO: optimize here
H
Hongze Cheng 已提交
846 847
  SRowInfo row[1];
  row->suid = bdata->suid;
H
Hongze Cheng 已提交
848
  for (int32_t i = 0; i < bdata->nRow; i++) {
H
Hongze Cheng 已提交
849 850
    row->uid = bdata->uid ? bdata->uid : bdata->aUid[i];
    row->row = tsdbRowFromBlockData(bdata, i);
H
Hongze Cheng 已提交
851

H
Hongze Cheng 已提交
852
    code = tsdbSttFileWriteTSData(writer, row);
H
Hongze Cheng 已提交
853 854 855 856 857
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
858
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
859
  }
H
Hongze Cheng 已提交
860
  return code;
H
Hongze Cheng 已提交
861 862
}

H
Hongze Cheng 已提交
863
int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record) {
H
Hongze Cheng 已提交
864
  int32_t code;
H
Hongze Cheng 已提交
865 866
  int32_t lino;

H
Hongze Cheng 已提交
867
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
868 869
    code = tsdbSttFWriterDoOpen(writer);
    return code;
H
Hongze Cheng 已提交
870 871 872 873 874
  } else {
    if (writer->bData->nRow > 0) {
      code = tsdbSttFileDoWriteTSDataBlock(writer);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
875

H
Hongze Cheng 已提交
876 877 878 879
    if (STATIS_BLOCK_SIZE(writer->sData) > 0) {
      code = tsdbSttFileDoWriteStatisBlock(writer);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
880
  }
H
Hongze Cheng 已提交
881

H
Hongze Cheng 已提交
882
  // write SDelRecord
H
Hongze Cheng 已提交
883
  code = tTombBlockPut(writer->tData, record);
H
Hongze Cheng 已提交
884
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
885

H
Hongze Cheng 已提交
886
  // write SDelBlock if need
H
Hongze Cheng 已提交
887
  if (TOMB_BLOCK_SIZE(writer->tData) >= writer->config->maxRow) {
H
Hongze Cheng 已提交
888
    code = tsdbSttFileDoWriteTombBlock(writer);
H
Hongze Cheng 已提交
889
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
890 891 892 893
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
894
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
895 896
  }
  return code;
H
Hongze Cheng 已提交
897 898 899
}

bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; }