tsdbSttFileRW.c 25.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbSttFileRW.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
typedef struct {
  int64_t   prevFooter;
H
Hongze Cheng 已提交
20 21 22 23 24
  SFDataPtr sttBlkPtr[1];
  SFDataPtr delBlkPtr[1];
  SFDataPtr statisBlkPtr[1];
  SFDataPtr rsrvd[2];
} SSttFooter;
H
Hongze Cheng 已提交
25

H
Hongze Cheng 已提交
26
// SSttFReader ============================================================
H
Hongze Cheng 已提交
27
struct SSttFileReader {
H
Hongze Cheng 已提交
28
  SSttFileReaderConfig config[1];
H
Hongze Cheng 已提交
29
  TSttSegReaderArray   readerArray[1];
H
Hongze Cheng 已提交
30 31 32 33 34
  STsdbFD             *fd;
};

struct SSttSegReader {
  SSttFileReader *reader;
H
Hongze Cheng 已提交
35
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
36 37 38 39 40
  struct {
    bool sttBlkLoaded;
    bool delBlkLoaded;
    bool statisBlkLoaded;
  } ctx;
H
Hongze Cheng 已提交
41 42 43
  TSttBlkArray    sttBlkArray[1];
  TDelBlkArray    delBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
H
Hongze Cheng 已提交
44 45
};

H
Hongze Cheng 已提交
46
// SSttFileReader
H
Hongze Cheng 已提交
47
static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) {
H
Hongze Cheng 已提交
48 49
  ASSERT(offset >= TSDB_FHDR_SIZE);

H
Hongze Cheng 已提交
50
  int32_t code = 0;
H
Hongze Cheng 已提交
51
  int32_t lino = 0;
H
Hongze Cheng 已提交
52
  int32_t vid = TD_VID(reader->config->tsdb->pVnode);
H
Hongze Cheng 已提交
53 54

  segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0]));
H
Hongze Cheng 已提交
55
  if (!segReader[0]) return TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
56 57

  segReader[0]->reader = reader;
H
Hongze Cheng 已提交
58
  code = tsdbReadFile(reader->fd, offset, (uint8_t *)(segReader[0]->footer), sizeof(SSttFooter));
H
Hongze Cheng 已提交
59 60 61 62 63 64 65 66
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
    taosMemoryFree(segReader[0]);
    segReader[0] = NULL;
  }
H
Hongze Cheng 已提交
67 68 69
  return code;
}

H
Hongze Cheng 已提交
70 71
static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
  if (!reader[0]) return 0;
H
Hongze Cheng 已提交
72

H
Hongze Cheng 已提交
73 74
  if (reader[0]->ctx.sttBlkLoaded) {
    TARRAY2_FREE(reader[0]->sttBlkArray);
H
Hongze Cheng 已提交
75
  }
H
Hongze Cheng 已提交
76 77
  if (reader[0]->ctx.delBlkLoaded) {
    TARRAY2_FREE(reader[0]->delBlkArray);
H
Hongze Cheng 已提交
78
  }
H
Hongze Cheng 已提交
79 80
  if (reader[0]->ctx.statisBlkLoaded) {
    TARRAY2_FREE(reader[0]->statisBlkArray);
H
Hongze Cheng 已提交
81
  }
H
Hongze Cheng 已提交
82 83
  taosMemoryFree(reader[0]);
  reader[0] = NULL;
H
Hongze Cheng 已提交
84
  return 0;
H
Hongze Cheng 已提交
85 86
}

H
Hongze Cheng 已提交
87
int32_t tsdbSttFReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) {
H
Hongze Cheng 已提交
88
  int32_t code = 0;
H
Hongze Cheng 已提交
89 90 91 92 93 94
  int32_t lino = 0;
  int32_t vid = TD_VID(config->tsdb->pVnode);

  reader[0] = taosMemoryCalloc(1, sizeof(*reader[0]));
  if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
95
  reader[0]->config[0] = config[0];
H
Hongze Cheng 已提交
96 97 98 99 100 101

  // open file
  code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
  TSDB_CHECK_CODE(code, lino, _exit);

  // open each segment reader
H
Hongze Cheng 已提交
102
  int64_t size = config->file->size;
H
Hongze Cheng 已提交
103
  while (size > 0) {
H
Hongze Cheng 已提交
104
    SSttSegReader *reader1;
H
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106
    code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SSttFooter), &reader1);
H
Hongze Cheng 已提交
107 108
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
109
    code = TARRAY2_APPEND(reader[0]->readerArray, reader1);
H
Hongze Cheng 已提交
110 111
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
112
    size = reader1->footer->prevFooter;
H
Hongze Cheng 已提交
113 114
  }

H
Hongze Cheng 已提交
115
  ASSERT(TARRAY2_SIZE(reader[0]->readerArray) == config->file->stt->nseg);
H
Hongze Cheng 已提交
116 117 118 119 120 121

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
    tsdbSttFReaderClose(reader);
  }
H
Hongze Cheng 已提交
122 123 124
  return code;
}

H
Hongze Cheng 已提交
125 126
int32_t tsdbSttFReaderClose(SSttFileReader **reader) {
  tsdbCloseFile(&reader[0]->fd);
H
Hongze Cheng 已提交
127
  TARRAY2_CLEAR_FREE(reader[0]->readerArray, tsdbSttSegReaderClose);
H
Hongze Cheng 已提交
128 129 130
  taosMemoryFree(reader[0]);
  reader[0] = NULL;
  return 0;
H
Hongze Cheng 已提交
131 132
}

H
Hongze Cheng 已提交
133 134
int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray) {
  readerArray[0] = reader->readerArray;
H
Hongze Cheng 已提交
135
  return 0;
H
Hongze Cheng 已提交
136 137
}

H
Hongze Cheng 已提交
138 139 140
// SSttFSegReader
int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) {
  if (!reader->ctx.statisBlkLoaded) {
H
Hongze Cheng 已提交
141 142
    if (reader->footer->statisBlkPtr->size > 0) {
      ASSERT(reader->footer->statisBlkPtr->size % sizeof(STbStatisBlk) == 0);
H
Hongze Cheng 已提交
143

H
Hongze Cheng 已提交
144 145
      int32_t size = reader->footer->statisBlkPtr->size / sizeof(STbStatisBlk);
      void   *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
146 147
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
148 149
      int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data,
                                  reader->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
150 151
      if (code) return code;

H
Hongze Cheng 已提交
152
      TARRAY2_INIT_EX(reader->statisBlkArray, size, size, data);
H
Hongze Cheng 已提交
153
    } else {
H
Hongze Cheng 已提交
154
      TARRAY2_INIT(reader->statisBlkArray);
H
Hongze Cheng 已提交
155 156 157 158 159
    }

    reader->ctx.statisBlkLoaded = true;
  }

H
Hongze Cheng 已提交
160
  statisBlkArray[0] = reader->statisBlkArray;
H
Hongze Cheng 已提交
161
  return 0;
H
Hongze Cheng 已提交
162 163
}

H
Hongze Cheng 已提交
164 165
int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray) {
  if (!reader->ctx.delBlkLoaded) {
H
Hongze Cheng 已提交
166 167
    if (reader->footer->delBlkPtr->size > 0) {
      ASSERT(reader->footer->delBlkPtr->size % sizeof(SDelBlk) == 0);
H
Hongze Cheng 已提交
168

H
Hongze Cheng 已提交
169 170
      int32_t size = reader->footer->delBlkPtr->size / sizeof(SDelBlk);
      void   *data = taosMemoryMalloc(reader->footer->delBlkPtr->size);
H
Hongze Cheng 已提交
171 172
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
173 174
      int32_t code =
          tsdbReadFile(reader->reader->fd, reader->footer->delBlkPtr->offset, data, reader->footer->delBlkPtr->size);
H
Hongze Cheng 已提交
175 176
      if (code) return code;

H
Hongze Cheng 已提交
177
      TARRAY2_INIT_EX(reader->delBlkArray, size, size, data);
H
Hongze Cheng 已提交
178
    } else {
H
Hongze Cheng 已提交
179
      TARRAY2_INIT(reader->delBlkArray);
H
Hongze Cheng 已提交
180 181 182 183 184
    }

    reader->ctx.delBlkLoaded = true;
  }

H
Hongze Cheng 已提交
185
  delBlkArray[0] = reader->delBlkArray;
H
Hongze Cheng 已提交
186 187 188 189 190
  return 0;
}

int32_t tsdbSttFReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) {
  if (!reader->ctx.sttBlkLoaded) {
H
Hongze Cheng 已提交
191 192
    if (reader->footer->sttBlkPtr->size > 0) {
      ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
H
Hongze Cheng 已提交
193

H
Hongze Cheng 已提交
194 195
      int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk);
      void   *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
196 197
      if (!data) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
198 199
      int32_t code =
          tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
200 201
      if (code) return code;

H
Hongze Cheng 已提交
202
      TARRAY2_INIT_EX(reader->sttBlkArray, size, size, data);
H
Hongze Cheng 已提交
203
    } else {
H
Hongze Cheng 已提交
204
      TARRAY2_INIT(reader->sttBlkArray);
H
Hongze Cheng 已提交
205 206 207 208 209
    }

    reader->ctx.sttBlkLoaded = true;
  }

H
Hongze Cheng 已提交
210
  sttBlkArray[0] = reader->sttBlkArray;
H
Hongze Cheng 已提交
211
  return 0;
H
Hongze Cheng 已提交
212 213
}

H
Hongze Cheng 已提交
214
int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
H
Hongze Cheng 已提交
215 216 217 218 219
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
220
int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData) {
H
Hongze Cheng 已提交
221
  int32_t code = 0;
H
Hongze Cheng 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
  int32_t lino = 0;
  int32_t vid = TD_VID(reader->reader->config->tsdb->pVnode);

  tDelBlockClear(dData);
  code = tRealloc(&reader->reader->config->aBuf[0], delBlk->dp->size);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbReadFile(reader->reader->fd, delBlk->dp->offset, reader->reader->config->aBuf[0], delBlk->dp->size);
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
  for (int32_t i = 0; i < ARRAY_SIZE(dData->aData); ++i) {
    code = tsdbDecmprData(reader->reader->config->aBuf[0] + size, delBlk->size[i], TSDB_DATA_TYPE_BIGINT,
                          TWO_STAGE_COMP, NULL, 0, NULL);  // TODO
    TSDB_CHECK_CODE(code, lino, _exit);

    size += delBlk->size[i];
  }

  ASSERT(size == delBlk->dp->size);
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d, reason:%s", vid, __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
246 247 248
  return code;
}

H
Hongze Cheng 已提交
249
int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData) {
H
Hongze Cheng 已提交
250
  int32_t code = 0;
H
Hongze Cheng 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
  int32_t lino = 0;
  int32_t vid = TD_VID(reader->reader->config->tsdb->pVnode);

  tStatisBlockClear(sData);
  code = tRealloc(&reader->reader->config->aBuf[0], statisBlk->dp->size);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->aBuf[0], statisBlk->dp->size);
  if (code) TSDB_CHECK_CODE(code, lino, _exit);

  int64_t size = 0;
  for (int32_t i = 0; i < ARRAY_SIZE(sData->aData); ++i) {
    code = tsdbDecmprData(reader->reader->config->aBuf[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT,
                          TWO_STAGE_COMP, NULL, 0, NULL);  // TODO
    TSDB_CHECK_CODE(code, lino, _exit);

    size += statisBlk->size[i];
  }

  ASSERT(size == statisBlk->dp->size);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d, reason:%s", vid, __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
276 277 278
  return code;
}

H
Hongze Cheng 已提交
279
// SSttFWriter ============================================================
H
Hongze Cheng 已提交
280
struct SSttFileWriter {
H
Hongze Cheng 已提交
281
  SSttFileWriterConfig config[1];
H
Hongze Cheng 已提交
282 283
  struct {
    bool opened;
H
Hongze Cheng 已提交
284
  } ctx[1];
H
Hongze Cheng 已提交
285
  // file
H
Hongze Cheng 已提交
286
  STFile file[1];
H
Hongze Cheng 已提交
287
  // data
H
Hongze Cheng 已提交
288 289 290 291
  TSttBlkArray    sttBlkArray[1];
  TDelBlkArray    delBlkArray[1];
  TStatisBlkArray statisBlkArray[1];
  SSttFooter      footer[1];
H
Hongze Cheng 已提交
292 293 294
  SBlockData      bData[1];
  SDelBlock       dData[1];
  STbStatisBlock  sData[1];
H
Hongze Cheng 已提交
295
  // helper data
H
Hongze Cheng 已提交
296 297
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
298 299
  int32_t  aBufSize[5];
  uint8_t *aBuf[5];
H
Hongze Cheng 已提交
300
  STsdbFD *fd;
H
Hongze Cheng 已提交
301
};
H
Hongze Cheng 已提交
302

H
Hongze Cheng 已提交
303 304 305
static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
  if (writer->bData->nRow == 0) return 0;

H
Hongze Cheng 已提交
306
  int32_t code = 0;
H
Hongze Cheng 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
  int32_t lino = 0;
  SSttBlk sttBlk[1];

  sttBlk->suid = writer->bData->suid;
  sttBlk->minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0];
  sttBlk->maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1];
  sttBlk->minKey = sttBlk->maxKey = writer->bData->aTSKEY[0];
  sttBlk->minVer = sttBlk->maxVer = writer->bData->aVersion[0];
  sttBlk->nRow = writer->bData->nRow;
  for (int32_t iRow = 1; iRow < writer->bData->nRow; iRow++) {
    if (sttBlk->minKey > writer->bData->aTSKEY[iRow]) sttBlk->minKey = writer->bData->aTSKEY[iRow];
    if (sttBlk->maxKey < writer->bData->aTSKEY[iRow]) sttBlk->maxKey = writer->bData->aTSKEY[iRow];
    if (sttBlk->minVer > writer->bData->aVersion[iRow]) sttBlk->minVer = writer->bData->aVersion[iRow];
    if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow];
  }

H
Hongze Cheng 已提交
323
  code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->aBufSize);
H
Hongze Cheng 已提交
324
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
325

H
Hongze Cheng 已提交
326
  sttBlk->bInfo.offset = writer->file->size;
H
Hongze Cheng 已提交
327 328
  sttBlk->bInfo.szKey = writer->aBufSize[2] + writer->aBufSize[3];
  sttBlk->bInfo.szBlock = writer->aBufSize[0] + writer->aBufSize[1] + sttBlk->bInfo.szKey;
H
Hongze Cheng 已提交
329

H
Hongze Cheng 已提交
330 331
  for (int32_t i = 3; i >= 0; i--) {
    if (writer->aBufSize[i]) {
H
Hongze Cheng 已提交
332
      code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[i], writer->aBufSize[i]);
H
Hongze Cheng 已提交
333
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
334
      writer->file->size += writer->aBufSize[i];
H
Hongze Cheng 已提交
335 336
    }
  }
H
Hongze Cheng 已提交
337
  tBlockDataClear(writer->bData);
H
Hongze Cheng 已提交
338

H
Hongze Cheng 已提交
339
  code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk);
H
Hongze Cheng 已提交
340
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
341 342 343

_exit:
  if (code) {
H
Hongze Cheng 已提交
344
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
345
              tstrerror(code));
H
Hongze Cheng 已提交
346
  }
H
Hongze Cheng 已提交
347 348 349
  return code;
}

H
Hongze Cheng 已提交
350
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
351
  if (STATIS_BLOCK_SIZE(writer->sData)) return 0;
H
Hongze Cheng 已提交
352

H
Hongze Cheng 已提交
353
  int32_t code = 0;
H
Hongze Cheng 已提交
354
  int32_t lino = 0;
H
Hongze Cheng 已提交
355

H
Hongze Cheng 已提交
356 357 358 359
  STbStatisBlk statisBlk[1] = {{
      .numRec = STATIS_BLOCK_SIZE(writer->sData),
      .minTid = {.suid = TARRAY2_FIRST(writer->sData->suid), .uid = TARRAY2_FIRST(writer->sData->uid)},
      .maxTid = {.suid = TARRAY2_LAST(writer->sData->suid), .uid = TARRAY2_LAST(writer->sData->uid)},
H
Hongze Cheng 已提交
360 361
      .minVer = TARRAY2_FIRST(writer->sData->minVer),
      .maxVer = TARRAY2_FIRST(writer->sData->maxVer),
H
Hongze Cheng 已提交
362 363
  }};

H
Hongze Cheng 已提交
364 365 366 367
  for (int32_t i = 1; i < STATIS_BLOCK_SIZE(writer->sData); i++) {
    statisBlk->minVer = TMIN(statisBlk->minVer, TARRAY2_GET(writer->sData->minVer, i));
    statisBlk->maxVer = TMAX(statisBlk->maxVer, TARRAY2_GET(writer->sData->maxVer, i));
  }
H
Hongze Cheng 已提交
368

H
Hongze Cheng 已提交
369 370
  statisBlk->dp->offset = writer->file->size;
  statisBlk->dp->size = 0;
H
Hongze Cheng 已提交
371

H
Hongze Cheng 已提交
372 373 374 375 376 377 378 379 380
  for (int32_t i = 0; i < ARRAY_SIZE(writer->sData->aData); i++) {
    int32_t size;
    code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->sData->aData[i]), TARRAY2_DATA_LEN(&writer->sData->aData[i]),
                        TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, &writer->config->aBuf[0], 0, &size,
                        &writer->config->aBuf[1]);
    TSDB_CHECK_CODE(code, lino, _exit);
    statisBlk->size[i] = size;
    statisBlk->dp->size += size;
  }
H
Hongze Cheng 已提交
381 382

  tStatisBlockClear(writer->sData);
H
Hongze Cheng 已提交
383

H
Hongze Cheng 已提交
384
  code = TARRAY2_APPEND_PTR(writer->statisBlkArray, statisBlk);
H
Hongze Cheng 已提交
385
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
386

H
Hongze Cheng 已提交
387 388
_exit:
  if (code) {
H
Hongze Cheng 已提交
389
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
390
              tstrerror(code));
H
Hongze Cheng 已提交
391
  }
H
Hongze Cheng 已提交
392 393 394
  return code;
}

H
Hongze Cheng 已提交
395
static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
396
#if 0
H
Hongze Cheng 已提交
397 398
  if (writer->dData->nRow == 0) return 0;

H
Hongze Cheng 已提交
399
  int32_t code = 0;
H
Hongze Cheng 已提交
400 401
  int32_t lino;

H
Hongze Cheng 已提交
402
  SDelBlk delBlk[1];
H
Hongze Cheng 已提交
403

H
Hongze Cheng 已提交
404 405 406 407 408 409 410 411 412
  delBlk->nRow = writer->sData->nRow;
  delBlk->minTid.suid = writer->sData->aData[0][0];
  delBlk->minTid.uid = writer->sData->aData[1][0];
  delBlk->maxTid.suid = writer->sData->aData[0][writer->sData->nRow - 1];
  delBlk->maxTid.uid = writer->sData->aData[1][writer->sData->nRow - 1];
  delBlk->minVer = delBlk->maxVer = delBlk->maxVer = writer->sData->aData[2][0];
  for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) {
    if (delBlk->minVer > writer->sData->aData[2][iRow]) delBlk->minVer = writer->sData->aData[2][iRow];
    if (delBlk->maxVer < writer->sData->aData[2][iRow]) delBlk->maxVer = writer->sData->aData[2][iRow];
H
Hongze Cheng 已提交
413 414
  }

H
Hongze Cheng 已提交
415
  delBlk->dp.offset = writer->file->size;
H
Hongze Cheng 已提交
416
  delBlk->dp.size = 0;  // TODO
H
Hongze Cheng 已提交
417

H
Hongze Cheng 已提交
418 419
  int64_t tsize = sizeof(int64_t) * writer->dData->nRow;
  for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->aData); i++) {
H
Hongze Cheng 已提交
420
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->dData->aData[i], tsize);
H
Hongze Cheng 已提交
421 422
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
423
    delBlk->dp.size += tsize;
H
Hongze Cheng 已提交
424
    writer->file->size += tsize;
H
Hongze Cheng 已提交
425
  }
H
Hongze Cheng 已提交
426
  tDelBlockDestroy(writer->dData);
H
Hongze Cheng 已提交
427

H
Hongze Cheng 已提交
428
  code = TARRAY2_APPEND_PTR(writer->delBlkArray, delBlk);
H
Hongze Cheng 已提交
429
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
430 431 432

_exit:
  if (code) {
H
Hongze Cheng 已提交
433
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
434 435 436 437
              tstrerror(code));
  } else {
    // tsdbTrace();
  }
H
Hongze Cheng 已提交
438
  return code;
H
Hongze Cheng 已提交
439 440
#endif
  return 0;
H
Hongze Cheng 已提交
441 442
}

H
Hongze Cheng 已提交
443
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
444
  int32_t code = 0;
H
Hongze Cheng 已提交
445 446
  int32_t lino;

H
Hongze Cheng 已提交
447
  writer->footer->sttBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
448
  writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray);
H
Hongze Cheng 已提交
449

H
Hongze Cheng 已提交
450
  if (writer->footer->sttBlkPtr->size) {
H
Hongze Cheng 已提交
451
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
H
Hongze Cheng 已提交
452
                         writer->footer->sttBlkPtr->size);
H
Hongze Cheng 已提交
453
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
454
    writer->file->size += writer->footer->sttBlkPtr->size;
H
Hongze Cheng 已提交
455 456 457 458
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
459
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
460
              tstrerror(code));
H
Hongze Cheng 已提交
461
  }
H
Hongze Cheng 已提交
462 463 464
  return code;
}

H
Hongze Cheng 已提交
465
static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
466
  int32_t code = 0;
H
Hongze Cheng 已提交
467
  int32_t lino;
H
Hongze Cheng 已提交
468

H
Hongze Cheng 已提交
469
  writer->footer->statisBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
470
  writer->footer->statisBlkPtr->size = TARRAY2_DATA_LEN(writer->statisBlkArray);
H
Hongze Cheng 已提交
471

H
Hongze Cheng 已提交
472
  if (writer->footer->statisBlkPtr->size) {
H
Hongze Cheng 已提交
473
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray),
H
Hongze Cheng 已提交
474
                         writer->footer->statisBlkPtr->size);
H
Hongze Cheng 已提交
475
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
476
    writer->file->size += writer->footer->statisBlkPtr->size;
H
Hongze Cheng 已提交
477 478 479 480
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
481
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
482
              tstrerror(code));
H
Hongze Cheng 已提交
483
  }
H
Hongze Cheng 已提交
484 485 486
  return code;
}

H
Hongze Cheng 已提交
487
static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
488
  int32_t code = 0;
H
Hongze Cheng 已提交
489 490
  int32_t lino;

H
Hongze Cheng 已提交
491
  writer->footer->delBlkPtr->offset = writer->file->size;
H
Hongze Cheng 已提交
492
  writer->footer->delBlkPtr->size = TARRAY2_DATA_LEN(writer->delBlkArray);
H
Hongze Cheng 已提交
493

H
Hongze Cheng 已提交
494
  if (writer->footer->delBlkPtr->size) {
H
Hongze Cheng 已提交
495
    code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->delBlkArray),
H
Hongze Cheng 已提交
496
                         writer->footer->delBlkPtr->size);
H
Hongze Cheng 已提交
497
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
498
    writer->file->size += writer->footer->delBlkPtr->size;
H
Hongze Cheng 已提交
499 500 501 502
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
503
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
504
              tstrerror(code));
H
Hongze Cheng 已提交
505 506 507 508
  }
  return code;
}

H
Hongze Cheng 已提交
509
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
510 511 512
  int32_t code =
      tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)&writer->footer, sizeof(writer->footer));
  writer->file->size += sizeof(writer->footer);
H
Hongze Cheng 已提交
513 514 515
  return code;
}

H
Hongze Cheng 已提交
516
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
517
  int32_t code = 0;
H
Hongze Cheng 已提交
518
  int32_t lino = 0;
H
Hongze Cheng 已提交
519
  int32_t vid = TD_VID(writer->config->tsdb->pVnode);
H
Hongze Cheng 已提交
520

H
Hongze Cheng 已提交
521
  // set
H
Hongze Cheng 已提交
522 523 524 525
  writer->file[0] = writer->config->file;
  writer->file->stt->nseg++;
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
H
Hongze Cheng 已提交
526
  if (!writer->config->aBuf) writer->config->aBuf = writer->aBuf;
H
Hongze Cheng 已提交
527

H
Hongze Cheng 已提交
528 529 530
  // open file
  int32_t flag;
  char    fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
531

H
Hongze Cheng 已提交
532
  if (writer->file->size) {
H
Hongze Cheng 已提交
533 534 535
    flag = TD_FILE_READ | TD_FILE_WRITE;
  } else {
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
536
  }
H
Hongze Cheng 已提交
537

H
Hongze Cheng 已提交
538
  tsdbTFileName(writer->config->tsdb, writer->file, fname);
H
Hongze Cheng 已提交
539
  code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
H
Hongze Cheng 已提交
540 541
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
542
  if (!writer->file->size) {
H
Hongze Cheng 已提交
543 544 545 546
    uint8_t hdr[TSDB_FHDR_SIZE] = {0};

    code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr));
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
547
    writer->file->size += sizeof(hdr);
H
Hongze Cheng 已提交
548
  }
H
Hongze Cheng 已提交
549

H
Hongze Cheng 已提交
550
_exit:
H
Hongze Cheng 已提交
551 552 553
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
H
Hongze Cheng 已提交
554
    writer->ctx->opened = true;
H
Hongze Cheng 已提交
555
  }
H
Hongze Cheng 已提交
556
  return 0;
H
Hongze Cheng 已提交
557 558
}

H
Hongze Cheng 已提交
559 560
static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
  ASSERT(!writer->fd);
H
Hongze Cheng 已提交
561

H
Hongze Cheng 已提交
562 563 564 565 566 567 568 569 570 571 572
  for (int32_t i = 0; i < ARRAY_SIZE(writer->aBufSize); ++i) {
    tFree(writer->aBuf[i]);
  }
  tDestroyTSchema(writer->skmRow->pTSchema);
  tDestroyTSchema(writer->skmTb->pTSchema);
  tStatisBlockFree(writer->sData);
  tDelBlockFree(writer->dData);
  tBlockDataDestroy(writer->bData);
  TARRAY2_FREE(writer->statisBlkArray);
  TARRAY2_FREE(writer->delBlkArray);
  TARRAY2_FREE(writer->sttBlkArray);
H
Hongze Cheng 已提交
573
}
H
Hongze Cheng 已提交
574

H
Hongze Cheng 已提交
575 576 577 578
static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) {
  // TODO
  return 0;
}
H
Hongze Cheng 已提交
579

H
Hongze Cheng 已提交
580 581 582
static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) {
  int32_t lino;
  int32_t code;
H
Hongze Cheng 已提交
583
  int32_t vid = TD_VID(writer->config->tsdb->pVnode);
H
Hongze Cheng 已提交
584

H
Hongze Cheng 已提交
585 586
  code = tsdbSttFileDoWriteTSDataBlock(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
587

H
Hongze Cheng 已提交
588
  code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
589 590
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
591 592
  code = tsdbSttFileDoWriteDelBlock(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
593

H
Hongze Cheng 已提交
594 595
  code = tsdbSttFileDoWriteSttBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
596

H
Hongze Cheng 已提交
597 598
  code = tsdbSttFileDoWriteStatisBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
599

H
Hongze Cheng 已提交
600 601
  code = tsdbSttFileDoWriteDelBlk(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
602

H
Hongze Cheng 已提交
603 604
  code = tsdbSttFileDoWriteFooter(writer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
605

H
Hongze Cheng 已提交
606
  code = tsdbSttFileDoUpdateHeader(writer);
H
Hongze Cheng 已提交
607 608
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
609
  code = tsdbFsyncFile(writer->fd);
H
Hongze Cheng 已提交
610 611
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
612
  tsdbCloseFile(&writer->fd);
H
Hongze Cheng 已提交
613

H
Hongze Cheng 已提交
614
  ASSERT(writer->config->file.size > writer->file->size);
H
Hongze Cheng 已提交
615 616 617
  op->optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE;
  op->fid = writer->config->file.fid;
  op->of = writer->config->file;
H
Hongze Cheng 已提交
618
  op->nf = writer->file[0];
H
Hongze Cheng 已提交
619

H
Hongze Cheng 已提交
620 621
_exit:
  if (code) {
H
Hongze Cheng 已提交
622
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
623
  }
H
Hongze Cheng 已提交
624
  return code;
H
Hongze Cheng 已提交
625
}
H
Hongze Cheng 已提交
626

H
Hongze Cheng 已提交
627
static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
H
Hongze Cheng 已提交
628
  if (writer->config->file.size) {  // truncate the file to the original size
H
Hongze Cheng 已提交
629 630
    ASSERT(writer->config->file.size <= writer->file->size);
    if (writer->config->file.size < writer->file->size) {
H
Hongze Cheng 已提交
631
      taosFtruncateFile(writer->fd->pFD, writer->config->file.size);
H
Hongze Cheng 已提交
632 633 634
      tsdbCloseFile(&writer->fd);
    }
  } else {  // remove the file
H
Hongze Cheng 已提交
635 636
    char fname[TSDB_FILENAME_LEN];
    tsdbTFileName(writer->config->tsdb, &writer->config->file, fname);
H
Hongze Cheng 已提交
637 638 639 640
    tsdbCloseFile(&writer->fd);
    taosRemoveFile(fname);
  }

H
Hongze Cheng 已提交
641 642
  return 0;
}
H
Hongze Cheng 已提交
643

H
Hongze Cheng 已提交
644
int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) {
H
Hongze Cheng 已提交
645
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
H
Hongze Cheng 已提交
646 647
  if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
648
  writer[0]->config[0] = config[0];
H
Hongze Cheng 已提交
649
  writer[0]->ctx->opened = false;
H
Hongze Cheng 已提交
650 651 652
  return 0;
}

H
Hongze Cheng 已提交
653
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op) {
H
Hongze Cheng 已提交
654
  int32_t code = 0;
H
Hongze Cheng 已提交
655
  int32_t lino = 0;
H
Hongze Cheng 已提交
656
  int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode);
H
Hongze Cheng 已提交
657

H
Hongze Cheng 已提交
658
  if (!writer[0]->ctx->opened) {
H
Hongze Cheng 已提交
659 660
    op->optype = TSDB_FOP_NONE;
  } else {
H
Hongze Cheng 已提交
661 662
    if (abort) {
      code = tsdbSttFWriterCloseAbort(writer[0]);
H
Hongze Cheng 已提交
663
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
664
    } else {
H
Hongze Cheng 已提交
665
      code = tsdbSttFWriterCloseCommit(writer[0], op);
H
Hongze Cheng 已提交
666
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
667
    }
H
Hongze Cheng 已提交
668
    tsdbSttFWriterDoClose(writer[0]);
H
Hongze Cheng 已提交
669
  }
H
Hongze Cheng 已提交
670 671
  taosMemoryFree(writer[0]);
  writer[0] = NULL;
H
Hongze Cheng 已提交
672 673 674

_exit:
  if (code) {
H
Hongze Cheng 已提交
675
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
676
  }
H
Hongze Cheng 已提交
677
  return code;
H
Hongze Cheng 已提交
678 679
}

H
Hongze Cheng 已提交
680
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
H
Hongze Cheng 已提交
681
  int32_t code = 0;
H
Hongze Cheng 已提交
682
  int32_t lino = 0;
H
Hongze Cheng 已提交
683

H
Hongze Cheng 已提交
684
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
685 686 687
    code = tsdbSttFWriterDoOpen(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
688

H
Hongze Cheng 已提交
689 690
  TSDBROW *pRow = &row->row;
  TSDBKEY  key = TSDBROW_KEY(pRow);
H
Hongze Cheng 已提交
691 692 693
  if (!TABLE_SAME_SCHEMA(writer->bData->suid, writer->bData->uid, row->suid, row->uid)) {
    code = tsdbSttFileDoWriteTSDataBlock(writer);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
694

H
Hongze Cheng 已提交
695
    if (STATIS_BLOCK_SIZE(writer->sData) >= writer->config->maxRow) {
H
Hongze Cheng 已提交
696
      code = tsdbSttFileDoWriteStatisBlock(writer);
H
Hongze Cheng 已提交
697
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
698 699
    }

H
Hongze Cheng 已提交
700 701 702 703 704 705 706 707 708 709 710 711 712
    STbStatisRecord record[1] = {{
        .suid = row->suid,
        .uid = row->uid,
        .firstKey = key.ts,
        .firstVer = key.version,
        .lastKey = key.ts,
        .lastVer = key.version,
        .minVer = key.version,
        .maxVer = key.version,
        .count = 1,
    }};
    code = tStatisBlockPut(writer->sData, record);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
713

H
Hongze Cheng 已提交
714
    code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb);
H
Hongze Cheng 已提交
715
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
716

H
Hongze Cheng 已提交
717 718
    TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid};
    code = tBlockDataInit(writer->bData, &id, writer->config->skmTb->pTSchema, NULL, 0);
H
Hongze Cheng 已提交
719
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
720 721
  }

H
Hongze Cheng 已提交
722
  if (row->row.type == TSDBROW_ROW_FMT) {
H
Hongze Cheng 已提交
723
    code = tsdbUpdateSkmRow(writer->config->tsdb, (TABLEID *)row, TSDBROW_SVERSION(pRow), writer->config->skmRow);
H
Hongze Cheng 已提交
724
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
725 726
  }

H
Hongze Cheng 已提交
727
  code = tBlockDataAppendRow(writer->bData, pRow, writer->config->skmRow->pTSchema, row->uid);
H
Hongze Cheng 已提交
728
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
729

H
Hongze Cheng 已提交
730
  if (writer->bData->nRow >= writer->config->maxRow) {
H
Hongze Cheng 已提交
731
    code = tsdbSttFileDoWriteTSDataBlock(writer);
H
Hongze Cheng 已提交
732
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
733 734
  }

H
Hongze Cheng 已提交
735 736 737 738 739 740 741 742
  TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key.version);
  TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key.version);
  if (key.ts > TARRAY2_LAST(writer->sData->lastKey)) {
    TARRAY2_LAST(writer->sData->lastKey) = key.ts;
    TARRAY2_LAST(writer->sData->lastVer) = key.version;
    TARRAY2_LAST(writer->sData->aCount)++;
  } else if (key.ts == TARRAY2_LAST(writer->sData->lastKey)) {
    TARRAY2_LAST(writer->sData->lastVer) = key.version;
H
Hongze Cheng 已提交
743 744 745 746
  } else {
    ASSERTS(0, "timestamp should be in ascending order");
  }

H
Hongze Cheng 已提交
747 748
_exit:
  if (code) {
H
Hongze Cheng 已提交
749
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
750
              tstrerror(code));
H
Hongze Cheng 已提交
751
  }
H
Hongze Cheng 已提交
752
  return code;
H
Hongze Cheng 已提交
753 754
}

H
Hongze Cheng 已提交
755
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) {
H
Hongze Cheng 已提交
756 757 758
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
759 760
  SRowInfo row[1];
  row->suid = bdata->suid;
H
Hongze Cheng 已提交
761
  for (int32_t i = 0; i < bdata->nRow; i++) {
H
Hongze Cheng 已提交
762 763
    row->uid = bdata->uid ? bdata->uid : bdata->aUid[i];
    row->row = tsdbRowFromBlockData(bdata, i);
H
Hongze Cheng 已提交
764

H
Hongze Cheng 已提交
765
    code = tsdbSttFileWriteTSData(writer, row);
H
Hongze Cheng 已提交
766 767 768 769 770
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
771
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
772 773
              tstrerror(code));
  }
H
Hongze Cheng 已提交
774 775 776
  return 0;
}

H
Hongze Cheng 已提交
777
int32_t tsdbSttFileWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData) {
H
Hongze Cheng 已提交
778 779
  ASSERTS(0, "TODO: Not implemented yet");

H
Hongze Cheng 已提交
780
  int32_t code;
H
Hongze Cheng 已提交
781
  if (!writer->ctx->opened) {
H
Hongze Cheng 已提交
782 783 784 785
    code = tsdbSttFWriterDoOpen(writer);
    return code;
  }

H
Hongze Cheng 已提交
786 787 788 789 790 791
  // writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid;         // suid
  // writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid;          // uid
  // writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version;  // version
  // writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey;     // skey
  // writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey;     // ekey
  // writer->dData[0].nRow++;
H
Hongze Cheng 已提交
792

H
Hongze Cheng 已提交
793 794 795 796 797 798
  // if (writer->dData[0].nRow >= writer->config->maxRow) {
  //   return tsdbSttFileDoWriteDelBlock(writer);
  // } else {
  //   return 0;
  // }
  return 0;
H
Hongze Cheng 已提交
799
}