tsdbCommit.c 44.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19
  STsdb *pTsdb;
H
Hongze Cheng 已提交
20
  /* commit data */
H
Hongze Cheng 已提交
21
  int64_t commitID;
H
Hongze Cheng 已提交
22 23
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
24 25
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
26
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
27
  // --------------
H
Hongze Cheng 已提交
28
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
29 30 31
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
32
  // commit file data
H
Hongze Cheng 已提交
33
  SDataFReader *pReader;
H
Hongze Cheng 已提交
34 35 36 37
  SMapData      oBlockIdxMap;  // SMapData<SBlockIdx>, read from reader
  SMapData      oBlockMap;     // SMapData<SBlock>, read from reader
  SBlock        oBlock;
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
38
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
39 40 41 42
  SMapData      nBlockIdxMap;  // SMapData<SBlockIdx>, build by committer
  SMapData      nBlockMap;     // SMapData<SBlock>
  SBlock        nBlock;
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
43 44 45
  int64_t       suid;
  int64_t       uid;
  STSchema     *pTSchema;
H
Hongze Cheng 已提交
46
  /* commit del */
H
Hongze Cheng 已提交
47
  SDelFReader *pDelFReader;
H
Hongze Cheng 已提交
48 49
  SMapData     oDelIdxMap;   // SMapData<SDelIdx>, old
  SMapData     oDelDataMap;  // SMapData<SDelData>, old
H
Hongze Cheng 已提交
50
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
51 52
  SMapData     nDelIdxMap;   // SMapData<SDelIdx>, new
  SMapData     nDelDataMap;  // SMapData<SDelData>, new
H
Hongze Cheng 已提交
53
} SCommitter;
H
refact  
Hongze Cheng 已提交
54

H
Hongze Cheng 已提交
55 56 57 58 59
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
60

H
refact  
Hongze Cheng 已提交
61
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
62
  int32_t code = 0;
H
Hongze Cheng 已提交
63

64 65
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
66
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
67
  if (code) goto _err;
H
Hongze Cheng 已提交
68

H
Hongze Cheng 已提交
69 70 71
  return code;

_err:
H
Hongze Cheng 已提交
72
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
73
  return code;
H
Hongze Cheng 已提交
74 75
}

H
more  
Hongze Cheng 已提交
76
int32_t tsdbCommit(STsdb *pTsdb) {
77
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
78

H
more  
Hongze Cheng 已提交
79
  int32_t    code = 0;
H
Hongze Cheng 已提交
80 81 82 83
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
84 85
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
86 87 88 89
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
90

H
more  
Hongze Cheng 已提交
91
  // start commit
H
more  
Hongze Cheng 已提交
92
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
93
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
94

H
refact  
Hongze Cheng 已提交
95 96
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
97
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
98 99

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
100
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
101 102

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
103
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
104 105

  // end commit
H
more  
Hongze Cheng 已提交
106
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
107
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109
_exit:
H
refact  
Hongze Cheng 已提交
110 111 112
  return code;

_err:
H
Hongze Cheng 已提交
113
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
114
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
115 116 117
  return code;
}

H
Hongze Cheng 已提交
118
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
119 120 121 122 123 124
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  SDelFile  *pDelFileR = NULL;  // TODO
  SDelFile  *pDelFileW = NULL;  // TODO

H
Hongze Cheng 已提交
125 126 127
  tMapDataReset(&pCommitter->oDelIdxMap);
  tMapDataReset(&pCommitter->nDelIdxMap);

H
Hongze Cheng 已提交
128
  // load old
H
Hongze Cheng 已提交
129
  if (pDelFileR) {
H
Hongze Cheng 已提交
130
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
131
    if (code) goto _err;
H
Hongze Cheng 已提交
132

H
Hongze Cheng 已提交
133 134
    code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
135 136
  }

H
Hongze Cheng 已提交
137
  // prepare new
H
Hongze Cheng 已提交
138
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
H
Hongze Cheng 已提交
139
  if (code) goto _err;
H
Hongze Cheng 已提交
140 141 142 143 144 145 146

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
147 148 149
  return code;
}

H
Hongze Cheng 已提交
150
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
151
  int32_t   code = 0;
H
Hongze Cheng 已提交
152
  SDelData *pDelData = &(SDelData){};
H
Hongze Cheng 已提交
153 154 155
  tb_uid_t  suid;
  tb_uid_t  uid;
  SDelIdx   delIdx;  // TODO
H
Hongze Cheng 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173

  // check no del data, just return
  if (pTbData && pTbData->pHead == NULL) {
    pTbData = NULL;
  }
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;

  // prepare
  if (pTbData) {
    delIdx.suid = pTbData->suid;
    delIdx.uid = pTbData->uid;
  } else {
    delIdx.suid = pDelIdx->suid;
    delIdx.uid = pDelIdx->uid;
  }
  delIdx.minKey = TSKEY_MAX;
  delIdx.maxKey = TSKEY_MIN;
  delIdx.minVersion = INT64_MAX;
H
Hongze Cheng 已提交
174
  delIdx.maxVersion = INT64_MIN;
H
Hongze Cheng 已提交
175 176 177 178 179 180 181 182 183 184 185 186

  // start
  tMapDataReset(&pCommitter->oDelDataMap);
  tMapDataReset(&pCommitter->nDelDataMap);

  if (pDelIdx) {
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, &pCommitter->oDelDataMap, NULL);
    if (code) goto _err;
  }

  // disk
  for (int32_t iDelData = 0; iDelData < pCommitter->oDelDataMap.nItem; iDelData++) {
H
Hongze Cheng 已提交
187
    code = tMapDataGetItemByIdx(&pCommitter->oDelDataMap, iDelData, pDelData, tGetDelData);
H
Hongze Cheng 已提交
188 189
    if (code) goto _err;

H
Hongze Cheng 已提交
190
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
191 192
    if (code) goto _err;

H
Hongze Cheng 已提交
193 194 195 196
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
197 198 199
  }

  // memory
H
Hongze Cheng 已提交
200 201 202
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
203 204
    if (code) goto _err;

H
Hongze Cheng 已提交
205 206 207 208
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
  }

  ASSERT(pCommitter->nDelDataMap.nItem > 0);

  // write
  code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelDataMap, NULL, &delIdx);
  if (code) goto _err;

  // put delIdx
  code = tMapDataPutItem(&pCommitter->nDelIdxMap, &delIdx, tPutDelIdx);
  if (code) goto _err;

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
229
static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iDelIdx = 0;
  int32_t    nDelIdx = pCommitter->oDelIdxMap.nItem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData   *pTbData;
  SDelIdx   *pDelIdx;
  SDelIdx    delIdx;
  int32_t    c;

  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iDelIdx < nDelIdx) {
    code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
    if (code) goto _err;
    pDelIdx = &delIdx;
  } else {
    pDelIdx = NULL;
  }

  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      c = tTABLEIDCmprFn(pTbData, pDelIdx);
H
Hongze Cheng 已提交
258
      if (c == 0) {
H
Hongze Cheng 已提交
259
        goto _commit_mem_and_disk_del;
H
Hongze Cheng 已提交
260
      } else if (c < 0) {
H
Hongze Cheng 已提交
261
        goto _commit_mem_del;
H
Hongze Cheng 已提交
262
      } else {
H
Hongze Cheng 已提交
263
        goto _commit_disk_del;
H
Hongze Cheng 已提交
264
      }
H
Hongze Cheng 已提交
265
    } else {
H
Hongze Cheng 已提交
266 267
      if (pTbData) goto _commit_mem_del;
      if (pDelIdx) goto _commit_disk_del;
H
Hongze Cheng 已提交
268 269
    }

H
Hongze Cheng 已提交
270 271 272 273 274 275 276
  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
277 278
      pTbData = NULL;
    }
H
Hongze Cheng 已提交
279
    continue;
H
Hongze Cheng 已提交
280

H
Hongze Cheng 已提交
281 282 283 284 285 286 287 288 289 290 291 292
  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;
    iDelIdx++;
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
293

H
Hongze Cheng 已提交
294 295
  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
296
    if (code) goto _err;
H
Hongze Cheng 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
    iTbData++;
    iDelIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
312 313
  }

H
Hongze Cheng 已提交
314
  return code;
H
Hongze Cheng 已提交
315 316 317 318

_err:
  tsdbError("vgId:%d commit del impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
319 320 321 322
}

static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
323

H
Hongze Cheng 已提交
324 325
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdxMap, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
326

H
Hongze Cheng 已提交
327
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL);
H
Hongze Cheng 已提交
328
  if (code) goto _err;
H
Hongze Cheng 已提交
329 330

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
331
  if (code) goto _err;
H
Hongze Cheng 已提交
332 333 334 335 336 337

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
338 339 340
  return code;

_err:
H
Hongze Cheng 已提交
341
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
342 343 344
  return code;
}

H
Hongze Cheng 已提交
345 346 347 348 349
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
  SDFileSet  wSet;
H
Hongze Cheng 已提交
350

H
Hongze Cheng 已提交
351 352
  // memory
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
353

H
Hongze Cheng 已提交
354 355 356 357 358 359 360 361
  // old
  tMapDataReset(&pCommitter->oBlockIdxMap);
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockReset(&pCommitter->oBlock);
  tBlockDataReset(&pCommitter->oBlockData);
  pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
  if (pRSet) {
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
362 363
    if (code) goto _err;

H
Hongze Cheng 已提交
364
    code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL);
H
Hongze Cheng 已提交
365
    if (code) goto _err;
H
Hongze Cheng 已提交
366
  }
H
Hongze Cheng 已提交
367

H
Hongze Cheng 已提交
368 369 370 371
  // new
  tMapDataReset(&pCommitter->nBlockIdxMap);
  tMapDataReset(&pCommitter->nBlockMap);
  tBlockReset(&pCommitter->nBlock);
H
Hongze Cheng 已提交
372
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
373 374 375 376 377 378 379 380 381 382
  if (pRSet) {
    wSet = (SDFileSet){.diskId = pRSet->diskId,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = pRSet->fData,
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = pRSet->fSma};
  } else {
    STfs   *pTfs = pTsdb->pVnode->pTfs;
    SDiskID did = {.level = 0, .id = 0};
H
Hongze Cheng 已提交
383

H
Hongze Cheng 已提交
384 385
    // TODO: alloc a new disk
    // tfsAllocDisk(pTfs, 0, &did);
H
Hongze Cheng 已提交
386

H
Hongze Cheng 已提交
387 388
    // create the directory
    tfsMkdirRecurAt(pTfs, pTsdb->path, did);
H
Hongze Cheng 已提交
389

H
Hongze Cheng 已提交
390 391 392 393 394 395
    wSet = (SDFileSet){.diskId = did,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = {.commitID = pCommitter->commitID, .size = 0},
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = {.commitID = pCommitter->commitID, .size = 0}};
H
Hongze Cheng 已提交
396
  }
H
Hongze Cheng 已提交
397 398
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
  if (code) goto _err;
H
Hongze Cheng 已提交
399

H
Hongze Cheng 已提交
400
_exit:
H
Hongze Cheng 已提交
401 402 403
  return code;

_err:
H
Hongze Cheng 已提交
404
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
405
  return code;
H
Hongze Cheng 已提交
406 407
}

H
Hongze Cheng 已提交
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
static int32_t tsdbCommitterUpdateSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->pTSchema) {
    if (pCommitter->suid == suid) {
      if (suid == 0) {
        if (pCommitter->uid == uid && sver == pCommitter->pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->pTSchema->version) goto _exit;
      }
    }
  }

_update_schema:
  pCommitter->suid = suid;
  pCommitter->uid = uid;
  tTSchemaDestroy(pCommitter->pTSchema);
  pCommitter->pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
  if (pCommitter->pTSchema == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
434
static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) {
H
Hongze Cheng 已提交
435
  int32_t      code = 0;
H
Hongze Cheng 已提交
436 437 438
  STsdb       *pTsdb = pCommitter->pTsdb;
  STbDataIter *pIter = &(STbDataIter){0};
  TSDBKEY      key = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
439
  TSDBROW      row;
H
Hongze Cheng 已提交
440 441 442
  TSDBROW     *pRow;

  // create iter
H
Hongze Cheng 已提交
443
  tsdbTbDataIterOpen(pTbData, &key, 0, pIter);
H
Hongze Cheng 已提交
444
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
445

H
Hongze Cheng 已提交
446
  if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) goto _exit;
H
Hongze Cheng 已提交
447

H
Hongze Cheng 已提交
448 449
  // main loop
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pTbData->suid, .uid = pTbData->uid};
H
Hongze Cheng 已提交
450
  SMapData   *mBlock = &pCommitter->nBlockMap;
H
Hongze Cheng 已提交
451 452
  SBlock     *pBlock = &pCommitter->nBlock;
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
453
  TSKEY       lastTS;
H
Hongze Cheng 已提交
454

H
Hongze Cheng 已提交
455
  tBlockIdxReset(pBlockIdx);
H
Hongze Cheng 已提交
456
  tMapDataReset(mBlock);
H
Hongze Cheng 已提交
457 458
  tBlockReset(pBlock);
  tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
459 460 461 462 463 464 465 466 467 468 469
  lastTS = TSKEY_MIN;
  while (1) {
    if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) {
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

    // update schema
H
Hongze Cheng 已提交
470 471 472
    code = tsdbCommitterUpdateSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
    if (code) goto _err;

H
Hongze Cheng 已提交
473
    // append
H
Hongze Cheng 已提交
474
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
H
Hongze Cheng 已提交
475
    if (code) goto _err;
H
Hongze Cheng 已提交
476

H
Hongze Cheng 已提交
477
    // update
H
Hongze Cheng 已提交
478 479 480
    pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
    pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow));
    pBlock->nRow++;
H
Hongze Cheng 已提交
481 482
    if (TSDBROW_TS(pRow) == lastTS) pBlock->hasDup = 1;
    lastTS = TSDBROW_TS(pRow);
H
more  
Hongze Cheng 已提交
483

H
Hongze Cheng 已提交
484
    // next
H
Hongze Cheng 已提交
485
    tsdbTbDataIterNext(pIter);
H
Hongze Cheng 已提交
486
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
487

H
Hongze Cheng 已提交
488 489 490
    // check
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
    continue;
H
Hongze Cheng 已提交
491

H
Hongze Cheng 已提交
492
  _write_block:
H
Hongze Cheng 已提交
493 494 495 496
    row = tBlockDataFirstRow(pBlockData);
    if (tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&row)) > 0) pBlock->minKey = TSDBROW_KEY(&row);
    row = tBlockDataLastRow(pBlockData);
    if (tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&row)) < 0) pBlock->maxKey = TSDBROW_KEY(&row);
H
Hongze Cheng 已提交
497
    pBlock->last = pBlockData->nRow < pCommitter->minRow ? 1 : 0;
H
Hongze Cheng 已提交
498
    code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
499 500
    if (code) goto _err;

H
Hongze Cheng 已提交
501 502
    // Design SMA and write SMA to file

H
Hongze Cheng 已提交
503 504 505 506 507 508 509
    // SBlockIdx
    code = tMapDataPutItem(mBlock, pBlock, tPutBlock);
    if (code) goto _err;
    pBlockIdx->minKey = TMIN(pBlockIdx->minKey, pBlock->minKey.ts);
    pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts);
    pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion);
    pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion);
H
Hongze Cheng 已提交
510 511 512 513

    tBlockReset(pBlock);
    tBlockDataReset(pBlockData);
    lastTS = TSKEY_MIN;
H
more  
Hongze Cheng 已提交
514 515
  }

H
Hongze Cheng 已提交
516 517 518 519 520 521 522
  // write block
  code = tsdbWriteBlock(pCommitter->pWriter, mBlock, NULL, pBlockIdx);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
  if (code) goto _err;

H
Hongze Cheng 已提交
523
_exit:
H
Hongze Cheng 已提交
524
  if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
H
Hongze Cheng 已提交
525 526 527
  return code;

_err:
H
Hongze Cheng 已提交
528
  tsdbError("vgId:%d tsdb commit memory data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
529 530 531
  return code;
}

H
Hongze Cheng 已提交
532 533 534 535
static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) {
  int32_t     code = 0;
  SMapData   *mBlockO = &pCommitter->oBlockMap;
  SBlock     *pBlockO = &pCommitter->oBlock;
H
Hongze Cheng 已提交
536
  SMapData   *mBlockN = &pCommitter->nBlockMap;
H
Hongze Cheng 已提交
537
  SBlock     *pBlockN = &pCommitter->nBlock;
H
Hongze Cheng 已提交
538
  SBlockIdx  *pBlockIdx = &(SBlockIdx){0};
H
Hongze Cheng 已提交
539 540 541 542 543 544 545 546 547 548 549 550 551
  SBlockData *pBlockDataO = &pCommitter->oBlockData;

  // read
  code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, mBlockO, NULL);
  if (code) goto _err;

  // loop to add to new
  tMapDataReset(mBlockN);
  for (int32_t iBlock = 0; iBlock < mBlockO->nItem; iBlock++) {
    tMapDataGetItemByIdx(mBlockO, iBlock, pBlockO, tGetBlock);

    if (pBlockO->last) {
      ASSERT(iBlock == mBlockO->nItem - 1);
H
Hongze Cheng 已提交
552
      code = tsdbReadBlockData(pCommitter->pReader, oBlockIdx, pBlockO, pBlockDataO, NULL, NULL);
H
Hongze Cheng 已提交
553 554 555
      if (code) goto _err;

      tBlockReset(pBlockN);
H
Hongze Cheng 已提交
556 557 558 559 560 561 562
      pBlockN->minKey = pBlockO->minKey;
      pBlockN->maxKey = pBlockO->maxKey;
      pBlockN->minVersion = pBlockO->minVersion;
      pBlockN->maxVersion = pBlockO->maxVersion;
      pBlockN->nRow = pBlockO->nRow;
      pBlockN->last = pBlockO->last;
      pBlockN->hasDup = pBlockO->hasDup;
H
Hongze Cheng 已提交
563
      code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
564 565 566 567 568 569 570 571 572 573 574
      if (code) goto _err;

      code = tMapDataPutItem(mBlockN, pBlockN, tPutBlock);
      if (code) goto _err;
    } else {
      code = tMapDataPutItem(mBlockN, pBlockO, tPutBlock);
      if (code) goto _err;
    }
  }

  // SBlock
H
Hongze Cheng 已提交
575
  *pBlockIdx = *oBlockIdx;
H
Hongze Cheng 已提交
576 577 578 579 580 581 582 583 584 585
  code = tsdbWriteBlock(pCommitter->pWriter, mBlockN, NULL, pBlockIdx);
  if (code) goto _err;

  // SBlockIdx
  code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
  if (code) goto _err;

  return code;

_err:
H
Hongze Cheng 已提交
586
  tsdbError("vgId:%d tsdb commit disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
587 588 589
  return code;
}

H
Hongze Cheng 已提交
590 591
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
                                  int8_t toDataOnly) {
H
Hongze Cheng 已提交
592
  int32_t     code = 0;
H
Hongze Cheng 已提交
593 594 595 596
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlock     *pBlock = &pCommitter->nBlock;
H
Hongze Cheng 已提交
597
  TSDBROW    *pRow1;
H
Hongze Cheng 已提交
598 599 600 601
  TSDBROW     row2;
  TSDBROW    *pRow2 = &row2;
  TSDBROW     row;
  TSDBROW    *pRow = &row;
H
Hongze Cheng 已提交
602
  int32_t     c = 0;
H
Hongze Cheng 已提交
603
  TSKEY       lastKey;
H
Hongze Cheng 已提交
604 605

  // read SBlockData
H
Hongze Cheng 已提交
606
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
H
Hongze Cheng 已提交
607 608 609 610
  if (code) goto _err;

  // loop to merge
  pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
611
  *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
H
Hongze Cheng 已提交
612
  ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
H
Hongze Cheng 已提交
613 614 615 616
  ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
  code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
  if (code) goto _err;

H
Hongze Cheng 已提交
617 618 619
  lastKey = TSKEY_MIN;
  tBlockReset(pBlock);
  tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
620 621
  while (true) {
    if (pRow1 == NULL && pRow2 == NULL) {
H
Hongze Cheng 已提交
622
      if (pBlockData->nRow == 0) {
H
Hongze Cheng 已提交
623 624 625 626 627 628 629
        break;
      } else {
        goto _write_block;
      }
    }

    if (pRow1 && pRow2) {
H
Hongze Cheng 已提交
630 631
      if (tsdbRowCmprFn(pRow1, pRow2) < 0) {
        *pRow = *pRow1;
H
Hongze Cheng 已提交
632 633 634 635

        tsdbTbDataIterNext(pIter);
        pRow1 = tsdbTbDataIterGet(pIter);

H
Hongze Cheng 已提交
636 637 638 639 640 641 642 643
        if (pRow1) {
          if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
            code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1));
            if (code) goto _err;
          } else {
            pRow1 = NULL;
          }
        }
H
Hongze Cheng 已提交
644
      } else if (tsdbRowCmprFn(pRow1, pRow2) > 0) {
H
Hongze Cheng 已提交
645 646 647 648 649 650 651
        *pRow = *pRow2;

        if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
          *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
        } else {
          pRow2 = NULL;
        }
H
Hongze Cheng 已提交
652 653 654 655
      } else {
        ASSERT(0);
      }
    } else if (pRow1) {
H
Hongze Cheng 已提交
656 657
      *pRow = *pRow1;

H
Hongze Cheng 已提交
658 659
      tsdbTbDataIterNext(pIter);
      pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
660 661 662 663 664 665 666 667
      if (pRow1) {
        if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
          code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1));
          if (code) goto _err;
        } else {
          pRow1 = NULL;
        }
      }
H
Hongze Cheng 已提交
668
    } else {
H
Hongze Cheng 已提交
669
      *pRow = *pRow2;
H
Hongze Cheng 已提交
670

H
Hongze Cheng 已提交
671 672 673 674 675
      if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
        *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
      } else {
        pRow2 = NULL;
      }
H
Hongze Cheng 已提交
676 677
    }

H
Hongze Cheng 已提交
678
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
H
Hongze Cheng 已提交
679 680 681 682 683 684 685
    if (code) goto _err;

    pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
    pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow));
    pBlock->nRow++;
    if (lastKey == TSDBROW_TS(pRow)) {
      pBlock->hasDup = 1;
H
Hongze Cheng 已提交
686
    } else {
H
Hongze Cheng 已提交
687
      lastKey = TSDBROW_TS(pRow);
H
Hongze Cheng 已提交
688 689
    }

H
Hongze Cheng 已提交
690 691 692
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
    continue;

H
Hongze Cheng 已提交
693
  _write_block:
H
Hongze Cheng 已提交
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }

    code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;

    lastKey = TSKEY_MIN;
    tBlockReset(pBlock);
    tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
709 710 711 712 713 714 715 716 717
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
718
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
H
Hongze Cheng 已提交
719 720
  int32_t     code = 0;
  TSDBROW    *pRow;
H
Hongze Cheng 已提交
721
  SBlock     *pBlock = &pCommitter->nBlock;
H
Hongze Cheng 已提交
722
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
723 724 725
  TSKEY       lastKey = TSKEY_MIN;
  int64_t     suid = pIter->pTbData->suid;
  int64_t     uid = pIter->pTbData->uid;
H
Hongze Cheng 已提交
726

H
Hongze Cheng 已提交
727
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
728
  tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
729
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
730
  ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
H
Hongze Cheng 已提交
731
  while (true) {
H
Hongze Cheng 已提交
732
    if (pRow == NULL) {
H
Hongze Cheng 已提交
733 734 735 736 737 738 739
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

H
Hongze Cheng 已提交
740
    // update schema
H
Hongze Cheng 已提交
741 742 743
    code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
    if (code) goto _err;

H
Hongze Cheng 已提交
744
    // append
H
Hongze Cheng 已提交
745 746 747
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
    if (code) goto _err;

H
Hongze Cheng 已提交
748 749 750 751 752 753 754 755 756 757
    // update
    pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
    pBlock->maxVersion = TMIN(pBlock->maxVersion, TSDBROW_VERSION(pRow));
    pBlock->nRow++;
    if (TSDBROW_TS(pRow) == lastKey) {
      pBlock->hasDup = 1;
    } else {
      lastKey = TSDBROW_TS(pRow);
    }

H
Hongze Cheng 已提交
758 759
    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
760
    if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
H
Hongze Cheng 已提交
761 762

    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
H
Hongze Cheng 已提交
763
    continue;
H
Hongze Cheng 已提交
764 765

  _write_block:
H
Hongze Cheng 已提交
766 767 768 769 770 771 772 773 774 775 776 777 778 779
    if (!toDataOnly && pBlockData->nRow < pCommitter->minKey) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }

    code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, &(SBlockIdx){.suid = suid, .uid = uid},
                              pBlock, pCommitter->cmprAlg);
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
780
    tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
781
    lastKey = TSKEY_MIN;
H
Hongze Cheng 已提交
782 783 784 785 786
  }

  return code;

_err:
H
Hongze Cheng 已提交
787
  tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
788 789 790
  return code;
}

H
Hongze Cheng 已提交
791
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
792
  int32_t code = 0;
H
Hongze Cheng 已提交
793 794

  if (pBlock->last) {
H
Hongze Cheng 已提交
795
    code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
H
Hongze Cheng 已提交
796 797
    if (code) goto _err;

H
Hongze Cheng 已提交
798 799 800 801 802 803 804 805 806
    tBlockReset(&pCommitter->nBlock);
    pCommitter->nBlock.minKey = pBlock->minKey;
    pCommitter->nBlock.maxKey = pBlock->maxKey;
    pCommitter->nBlock.minVersion = pBlock->minVersion;
    pCommitter->nBlock.nRow = pBlock->nRow;
    pCommitter->nBlock.last = pBlock->last;
    pCommitter->nBlock.hasDup = pBlock->hasDup;
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
                              pCommitter->cmprAlg);
H
Hongze Cheng 已提交
807 808 809 810 811 812 813 814 815 816 817 818 819
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
    if (code) goto _err;
  } else {
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
820 821 822
  return code;
}

H
Hongze Cheng 已提交
823
static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *oBlockIdx) {
H
Hongze Cheng 已提交
824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932
  int32_t code = 0;
  //   STbDataIter *pIter = &(STbDataIter){0};
  //   TSDBROW     *pRow;

  //   // create iter
  //   tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
  //   pRow == tsdbTbDataIterGet(pIter);
  //   if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) {
  //     code = tsdbCommitDiskData(pCommitter, oBlockIdx);
  //     if (code) {
  //       goto _err;
  //     } else {
  //       goto _exit;
  //     }
  //   }

  //   // start ==================
  //   // read
  //   code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL);
  //   if (code) goto _err;

  //   // loop to merge
  //   // SBlockData *pBlockData = &pCommitter->nBlockData;
  //   int32_t iBlock = 0;
  //   int32_t nBlock = pCommitter->oBlockMap.nItem;
  //   // SBlock     *pBlockO = &pCommitter->oBlock;
  //   SBlock *pBlock;
  //   int32_t c;

  //   // merge ===================
  //   while (true) {
  //     if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break;

  //     if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) {
  //       if (pBlock->last) {
  //         // merge memory data and disk data to write to .data/.last (todo)
  //         code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
  //                                   (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
  //         if (code) goto _err;

  //         pRow = tsdbTbDataIterGet(pIter);
  //         iBlock++;
  //       } else {
  //         c = tBlockCmprFn(&(SBlock){}, pBlock);

  //         if (c < 0) {
  //           // commit memory data until pBlock->minKey (not included) only to .data file (todo)
  //           code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
  //           if (code) goto _err;

  //           pRow = tsdbTbDataIterGet(pIter);
  //         } else if (c > 0) {
  //           // just move the block (todo)
  //           // code = tsdbCommitTableDiskData(pCommitter, pBlock);
  //           if (code) goto _err;

  //           iBlock++;
  //           // TODO
  //         } else {
  //           int64_t nOvlp = 0;  // = tsdbOvlpRows();
  //           if (nOvlp + pBlock->nRow <= pCommitter->maxRow) {
  //             // add as a subblock
  //           } else {
  //             if (iBlock == nBlock - 1) {
  //               // merge memory data and disk data to .data/.last file
  //               code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
  //                                         (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
  //               if (code) goto _err;
  //             } else {
  //               // merge memory data and disk data to .data file only until pBlock[1].
  //               code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1);
  //             }
  //           }

  //           pRow = tsdbTbDataIterGet(pIter);
  //           iBlock++;
  //         }
  //       }
  //     } else if (pBlock) {
  //       // code = tsdbCommitTableDiskData(pCommitter, pBlock);
  //       if (code) goto _err;

  //       iBlock++;
  //       // next block
  //     } else {
  //       // commit only memory data until (pCommitter->maxKey, VERSION_MAX)
  //       code =
  //           tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version =
  //           VERSION_MIN}, 0);
  //       if (code) goto _err;

  //       pRow = tsdbTbDataIterGet(pIter);
  //     }
  //   }

  //   // end =====================
  //   // SBlock
  //   // code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
  //   // if (code) goto _err;

  //   // // SBlockIdx
  //   // code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
  //   // if (code) goto _err;

  // _exit:
  //   pRow = tsdbTbDataIterGet(pIter);
  //   if (pRow) {
  //     pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  //   }
H
Hongze Cheng 已提交
933 934
  return code;

H
Hongze Cheng 已提交
935 936 937
  // _err:
  //   tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode),
  //   tstrerror(code)); return code;
H
Hongze Cheng 已提交
938 939
}

H
Hongze Cheng 已提交
940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
  int32_t    code = 0;
  SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = suid, .uid = uid};

  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL) break;
    key = TSDBROW_KEY(pRow);

    c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  TSDBROW    *pRow;

  tBlockDataReset(pBlockData);
  pRow = tsdbTbDataIterGet(pIter);
  code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
  if (code) goto _err;
  while (true) {
    if (pRow) break;
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) {
      int32_t c = tBlockCmprFn(&(SBlock){}, pBlock);

      if (c == 0) {
        code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
        if (code) goto _err;
      } else if (c > 0) {
        pRow = NULL;
      } else {
        ASSERT(0);
      }
    }
  }

  // write as a subblock
  code = tBlockCopy(pBlock, &pCommitter->nBlock);
  if (code) goto _err;

  code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
                            pCommitter->cmprAlg);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
1033 1034 1035 1036
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
  STbDataIter *pIter = &(STbDataIter){0};
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
1037
  int32_t      iBlock;
H
Hongze Cheng 已提交
1038
  int32_t      nBlock;
H
Hongze Cheng 已提交
1039 1040
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
1041 1042 1043 1044

  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
1045 1046
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;

H
Hongze Cheng 已提交
1047 1048
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059
  } else {
    pIter = NULL;
    pRow = NULL;
  }

  if (pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
    if (code) goto _err;

    nBlock = pCommitter->oBlockMap.nItem;
    ASSERT(nBlock > 0);
H
Hongze Cheng 已提交
1060

H
Hongze Cheng 已提交
1061 1062
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
1063 1064 1065 1066
  } else {
    nBlock = 0;
  }

H
Hongze Cheng 已提交
1067
  if (pRow == NULL && nBlock == 0) goto _exit;
H
Hongze Cheng 已提交
1068 1069

  // start ===========
H
Hongze Cheng 已提交
1070
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
1071
  SBlock *pBlock = &pCommitter->oBlock;
H
Hongze Cheng 已提交
1072

H
Hongze Cheng 已提交
1073
  iBlock = 0;
H
Hongze Cheng 已提交
1074 1075 1076 1077 1078 1079
  if (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
  } else {
    pBlock = NULL;
  }

H
Hongze Cheng 已提交
1080 1081
  // merge ===========
  while (true) {
H
Hongze Cheng 已提交
1082
    if (pRow == NULL && pBlock == NULL) break;
H
Hongze Cheng 已提交
1083

H
Hongze Cheng 已提交
1084
    if (pRow && pBlock) {
H
Hongze Cheng 已提交
1085
      if (pBlock->last) {
H
Hongze Cheng 已提交
1086
        code = tsdbMergeTableData(pCommitter, pIter, pBlock,
H
Hongze Cheng 已提交
1087
                                  (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
1088 1089 1090
        if (code) goto _err;

        pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
1091
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
1092
        iBlock++;
H
Hongze Cheng 已提交
1093 1094 1095 1096 1097
        if (iBlock < nBlock) {
          tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
        } else {
          pBlock = NULL;
        }
H
Hongze Cheng 已提交
1098 1099

        ASSERT(pRow == NULL && pBlock == NULL);
H
Hongze Cheng 已提交
1100
      } else {
H
Hongze Cheng 已提交
1101
        int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
H
Hongze Cheng 已提交
1102
        if (c > 0) {
H
Hongze Cheng 已提交
1103
          // only disk data
H
Hongze Cheng 已提交
1104
          code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
1105 1106
          if (code) goto _err;

H
Hongze Cheng 已提交
1107 1108 1109 1110 1111 1112
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
1113
        } else if (c < 0) {
H
Hongze Cheng 已提交
1114
          // only memory data
H
Hongze Cheng 已提交
1115
          code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
H
Hongze Cheng 已提交
1116 1117 1118
          if (code) goto _err;

          pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
1119
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
1120
        } else {
H
Hongze Cheng 已提交
1121
          // merge memory and disk
H
Hongze Cheng 已提交
1122 1123
          int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
          ASSERT(nOvlp);
H
Hongze Cheng 已提交
1124
          if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
H
Hongze Cheng 已提交
1125 1126
            code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
            if (code) goto _err;
H
Hongze Cheng 已提交
1127
          } else {
H
Hongze Cheng 已提交
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
            TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
            int8_t  toDataOnly = 0;

            if (iBlock < nBlock - 1) {
              toDataOnly = 1;

              SBlock nextBlock = {0};
              tBlockReset(&nextBlock);
              tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
              toKey = nextBlock.minKey;
H
Hongze Cheng 已提交
1138
            }
H
Hongze Cheng 已提交
1139 1140 1141

            code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
            if (code) goto _err;
H
Hongze Cheng 已提交
1142
          }
H
Hongze Cheng 已提交
1143 1144 1145 1146 1147 1148 1149 1150 1151

          pRow = tsdbTbDataIterGet(pIter);
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
1152 1153 1154
        }
      }
    } else if (pBlock) {
H
Hongze Cheng 已提交
1155
      code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
1156 1157
      if (code) goto _err;

H
Hongze Cheng 已提交
1158 1159 1160 1161 1162 1163
      iBlock++;
      if (iBlock < nBlock) {
        tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
      } else {
        pBlock = NULL;
      }
H
Hongze Cheng 已提交
1164
    } else {
H
Hongze Cheng 已提交
1165 1166
      code =
          tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
1167 1168 1169
      if (code) goto _err;

      pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
1170 1171
      if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
      ASSERT(pRow == NULL);
H
Hongze Cheng 已提交
1172 1173 1174
    }
  }

H
Hongze Cheng 已提交
1175 1176
  // end =====================
  code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
H
Hongze Cheng 已提交
1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
  if (code) goto _err;

_exit:
  if (pIter) {
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  }
  return code;

_err:
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
1191 1192
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
1193
  int32_t    c;
H
Hongze Cheng 已提交
1194 1195 1196 1197
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
1198
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
1199
  int32_t    nBlockIdx = pCommitter->oBlockIdxMap.nItem;
H
Hongze Cheng 已提交
1200
  STbData   *pTbData;
H
Hongze Cheng 已提交
1201
  SBlockIdx *pBlockIdx = &(SBlockIdx){0};
H
Hongze Cheng 已提交
1202

H
Hongze Cheng 已提交
1203
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
1204

H
Hongze Cheng 已提交
1205 1206
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
1207
    tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
1208 1209
  } else {
    pBlockIdx = NULL;
H
Hongze Cheng 已提交
1210 1211
  }

H
Hongze Cheng 已提交
1212 1213 1214 1215 1216 1217
  // merge
  while (pTbData && pBlockIdx) {
    c = tTABLEIDCmprFn(pTbData, pBlockIdx);

    if (c == 0) {
      // merge commit
H
Hongze Cheng 已提交
1218 1219
      code = tsdbMergeMemDisk(pCommitter, pTbData, pBlockIdx);
      if (code) goto _err;
H
Hongze Cheng 已提交
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234

      iTbData++;
      iBlockIdx++;
      if (iTbData < nTbData) {
        pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
      } else {
        pTbData = NULL;
      }
      if (iBlockIdx < nBlockIdx) {
        tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
      } else {
        pBlockIdx = NULL;
      }
    } else if (c < 0) {
      // commit memory data
H
Hongze Cheng 已提交
1235 1236
      code = tsdbCommitMemoryData(pCommitter, pTbData);
      if (code) goto _err;
H
Hongze Cheng 已提交
1237 1238 1239 1240 1241 1242 1243 1244 1245

      iTbData++;
      if (iTbData < nTbData) {
        pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
      } else {
        pTbData = NULL;
      }
    } else {
      // commit disk data
H
Hongze Cheng 已提交
1246 1247
      code = tsdbCommitDiskData(pCommitter, pBlockIdx);
      if (code) goto _err;
H
Hongze Cheng 已提交
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260

      iBlockIdx++;
      if (iBlockIdx < nBlockIdx) {
        tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
      } else {
        pBlockIdx = NULL;
      }
    }
  }

  // disk
  while (pBlockIdx) {
    // commit disk data
H
Hongze Cheng 已提交
1261 1262
    code = tsdbCommitDiskData(pCommitter, pBlockIdx);
    if (code) goto _err;
H
Hongze Cheng 已提交
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285

    iBlockIdx++;
    if (iBlockIdx < nBlockIdx) {
      tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
    } else {
      pBlockIdx = NULL;
    }
  }

  // memory
  while (pTbData) {
    // commit memory data
    code = tsdbCommitMemoryData(pCommitter, pTbData);
    if (code) goto _err;

    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
  }

H
Hongze Cheng 已提交
1286 1287 1288
  return code;

_err:
H
Hongze Cheng 已提交
1289
  tsdbError("vgId:%d commit file data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1290 1291 1292 1293 1294
  return code;
}

static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1295

H
Hongze Cheng 已提交
1296
  // write blockIdx
H
Hongze Cheng 已提交
1297
  code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdxMap, NULL);
H
Hongze Cheng 已提交
1298 1299
  if (code) goto _err;

H
Hongze Cheng 已提交
1300
  // update file header
H
Hongze Cheng 已提交
1301
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
H
Hongze Cheng 已提交
1302 1303
  if (code) goto _err;

H
Hongze Cheng 已提交
1304 1305 1306 1307
  // upsert SDFileSet
  code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
  if (code) goto _err;

H
Hongze Cheng 已提交
1308
  // close and sync
H
Hongze Cheng 已提交
1309
  code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
H
Hongze Cheng 已提交
1310 1311 1312
  if (code) goto _err;

  if (pCommitter->pReader) {
H
Hongze Cheng 已提交
1313
    code = tsdbDataFReaderClose(&pCommitter->pReader);
H
Hongze Cheng 已提交
1314 1315 1316 1317 1318 1319 1320
    goto _err;
  }

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
1321
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1322 1323 1324
  return code;
}

H
Hongze Cheng 已提交
1325
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1326 1327
  int32_t code = 0;

H
Hongze Cheng 已提交
1328 1329
  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
1330
  if (code) goto _err;
H
Hongze Cheng 已提交
1331

H
Hongze Cheng 已提交
1332 1333
  // commit file data impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
1334
  if (code) goto _err;
H
Hongze Cheng 已提交
1335

H
Hongze Cheng 已提交
1336 1337
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
1338
  if (code) goto _err;
H
Hongze Cheng 已提交
1339 1340 1341 1342

  return code;

_err:
H
Hongze Cheng 已提交
1343
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1344 1345 1346
  return code;
}

H
Hongze Cheng 已提交
1347 1348
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1349
  int32_t code = 0;
H
Hongze Cheng 已提交
1350

H
Hongze Cheng 已提交
1351 1352
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
1353

H
Hongze Cheng 已提交
1354 1355 1356 1357
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
1358

H
Hongze Cheng 已提交
1359
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
1360
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
1361 1362 1363 1364
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
1365
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
1366

H
Hongze Cheng 已提交
1367 1368 1369 1370 1371 1372 1373
  code = tsdbFSBegin(pTsdb->fs);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1374 1375 1376
  return code;
}

H
Hongze Cheng 已提交
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

  pCommitter->pReader = NULL;
  pCommitter->oBlockIdxMap = tMapDataInit();
  pCommitter->oBlockMap = tMapDataInit();
  pCommitter->oBlock = tBlockInit();
  pCommitter->pWriter = NULL;
  pCommitter->nBlockIdxMap = tMapDataInit();
  pCommitter->nBlockMap = tMapDataInit();
  pCommitter->nBlock = tBlockInit();
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
  code = tBlockDataInit(&pCommitter->nBlockData);
  if (code) {
    tBlockDataClear(&pCommitter->oBlockData);
    goto _exit;
  }

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1401 1402 1403 1404 1405 1406 1407 1408
  // tMapDataClear(&pCommitter->oBlockIdxMap);
  // tMapDataClear(&pCommitter->oBlockMap);
  // tBlockClear(&pCommitter->oBlock);
  // tBlockDataClear(&pCommitter->oBlockData);
  // tMapDataClear(&pCommitter->nBlockIdxMap);
  // tMapDataClear(&pCommitter->nBlockMap);
  // tBlockClear(&pCommitter->nBlock);
  // tBlockDataClear(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
1409 1410
}

H
Hongze Cheng 已提交
1411 1412 1413 1414
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1415

H
Hongze Cheng 已提交
1416
  // check
H
Hongze Cheng 已提交
1417
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
1418

H
Hongze Cheng 已提交
1419 1420
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
1421
  if (code) goto _err;
H
Hongze Cheng 已提交
1422 1423 1424

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
1425 1426 1427 1428 1429
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
1430
    if (code) goto _err;
H
Hongze Cheng 已提交
1431
  }
H
Hongze Cheng 已提交
1432

H
Hongze Cheng 已提交
1433 1434 1435
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
1436 1437 1438
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
1439

H
Hongze Cheng 已提交
1440
_err:
H
Hongze Cheng 已提交
1441
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
1442 1443 1444
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1445

H
Hongze Cheng 已提交
1446 1447 1448 1449
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1450

H
Hongze Cheng 已提交
1451 1452
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
1453
  }
H
Hongze Cheng 已提交
1454

H
Hongze Cheng 已提交
1455 1456 1457 1458 1459
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1460

H
Hongze Cheng 已提交
1461 1462 1463 1464 1465
  // impl
  code = tsdbCommitDelImpl(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1466

H
Hongze Cheng 已提交
1467 1468 1469 1470 1471
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1472

H
Hongze Cheng 已提交
1473
_exit:
H
Hongze Cheng 已提交
1474
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1475 1476 1477
  return code;

_err:
H
Hongze Cheng 已提交
1478
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1479
  return code;
H
Hongze Cheng 已提交
1480 1481 1482 1483 1484 1485 1486
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
1487 1488

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

  if (eno == 0) {
    code = tsdbFSCommit(pTsdb->fs);
  } else {
    code = tsdbFSRollback(pTsdb->fs);
  }

  tsdbMemTableDestroy(pMemTable);
  pTsdb->imem = NULL;

  tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1507 1508
  return code;
}