tsdbCommit.c 34.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17 18 19 20 21
typedef struct {
  int64_t   suid;
  int64_t   uid;
  STSchema *pTSchema;
} SSkmInfo;
H
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23
typedef struct {
H
Hongze Cheng 已提交
24
  STsdb *pTsdb;
H
Hongze Cheng 已提交
25
  /* commit data */
H
Hongze Cheng 已提交
26
  int64_t commitID;
H
Hongze Cheng 已提交
27 28
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
29 30
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
31
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32
  STsdbFS fs;
H
Hongze Cheng 已提交
33
  // --------------
H
Hongze Cheng 已提交
34
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
35 36 37
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
38
  // commit file data
H
Hongze Cheng 已提交
39 40 41
  struct {
    SDataFReader *pReader;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
42
    SArray       *aBlockL;    // SArray<SBlockL>
H
Hongze Cheng 已提交
43 44
    SMapData      mBlock;     // SMapData<SBlock>, read from reader
    SBlockData    bData;
H
Hongze Cheng 已提交
45
    SBlockData    bDatal;
H
Hongze Cheng 已提交
46 47 48 49
  } dReader;
  struct {
    SDataFWriter *pWriter;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
50
    SArray       *aBlockL;    // SArray<SBlockL>
H
Hongze Cheng 已提交
51 52
    SMapData      mBlock;     // SMapData<SBlock>
    SBlockData    bData;
H
Hongze Cheng 已提交
53
    SBlockData    bDatal;
H
Hongze Cheng 已提交
54 55 56
  } dWriter;
  SSkmInfo skmTable;
  SSkmInfo skmRow;
H
Hongze Cheng 已提交
57
  /* commit del */
H
Hongze Cheng 已提交
58 59
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
60 61 62
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
63
} SCommitter;
H
refact  
Hongze Cheng 已提交
64

H
Hongze Cheng 已提交
65 66 67 68 69
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
70

H
refact  
Hongze Cheng 已提交
71
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
72
  int32_t code = 0;
H
Hongze Cheng 已提交
73

74 75
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
76 77
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
78
  if (code) goto _err;
H
Hongze Cheng 已提交
79

H
Hongze Cheng 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
  // lock
  code = taosThreadRwlockWrlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

  pTsdb->mem = pMemTable;

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

H
Hongze Cheng 已提交
96 97 98
  return code;

_err:
S
Shengliang Guan 已提交
99
  tsdbError("vgId:%d, tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
100
  return code;
H
Hongze Cheng 已提交
101 102
}

H
more  
Hongze Cheng 已提交
103
int32_t tsdbCommit(STsdb *pTsdb) {
104
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
105

H
more  
Hongze Cheng 已提交
106
  int32_t    code = 0;
H
Hongze Cheng 已提交
107 108 109 110
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
111
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
112
    taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
113
    pTsdb->mem = NULL;
H
Hongze Cheng 已提交
114 115 116
    taosThreadRwlockUnlock(&pTsdb->rwLock);

    tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
117 118
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
119

H
more  
Hongze Cheng 已提交
120
  // start commit
H
more  
Hongze Cheng 已提交
121
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
122
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
123

H
refact  
Hongze Cheng 已提交
124 125
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
126
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
127 128

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
129
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
130 131

  // end commit
H
more  
Hongze Cheng 已提交
132
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
133
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
134

H
Hongze Cheng 已提交
135
_exit:
H
refact  
Hongze Cheng 已提交
136 137 138
  return code;

_err:
H
Hongze Cheng 已提交
139
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
140
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
141 142 143
  return code;
}

H
Hongze Cheng 已提交
144
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
145 146 147 148
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
166

H
Hongze Cheng 已提交
167
  SDelFile *pDelFileR = pCommitter->fs.pDelFile;
H
Hongze Cheng 已提交
168
  if (pDelFileR) {
H
Hongze Cheng 已提交
169
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
170
    if (code) goto _err;
H
Hongze Cheng 已提交
171

H
Hongze Cheng 已提交
172
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL);
H
Hongze Cheng 已提交
173
    if (code) goto _err;
H
Hongze Cheng 已提交
174 175
  }

H
Hongze Cheng 已提交
176
  // prepare new
H
Hongze Cheng 已提交
177 178
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
179
  if (code) goto _err;
H
Hongze Cheng 已提交
180 181

_exit:
S
Shengliang Guan 已提交
182
  tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
183 184 185
  return code;

_err:
S
Shengliang Guan 已提交
186
  tsdbError("vgId:%d, commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
187 188 189
  return code;
}

H
Hongze Cheng 已提交
190
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
191
  int32_t   code = 0;
H
Hongze Cheng 已提交
192
  SDelData *pDelData;
H
Hongze Cheng 已提交
193 194
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
195 196

  if (pTbData) {
H
Hongze Cheng 已提交
197 198
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200 201 202 203
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
204 205

  if (pDelIdx) {
H
Hongze Cheng 已提交
206 207 208 209
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
H
Hongze Cheng 已提交
210
    if (code) goto _err;
211 212
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
213 214
  }

H
Hongze Cheng 已提交
215
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
216

H
Hongze Cheng 已提交
217
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
218 219

  // memory
H
Hongze Cheng 已提交
220 221
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
222 223 224 225
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
226 227 228
  }

  // write
H
Hongze Cheng 已提交
229
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx);
H
Hongze Cheng 已提交
230 231 232
  if (code) goto _err;

  // put delIdx
233
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
234 235 236
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
237 238 239 240 241

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
242
  tsdbError("vgId:%d, commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
243 244 245
  return code;
}

H
Hongze Cheng 已提交
246 247
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
248
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
249

H
Hongze Cheng 已提交
250
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL);
H
Hongze Cheng 已提交
251
  if (code) goto _err;
H
Hongze Cheng 已提交
252

H
Hongze Cheng 已提交
253 254 255
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
256
  code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
257
  if (code) goto _err;
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
260
  if (code) goto _err;
H
Hongze Cheng 已提交
261 262

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
263
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
264 265 266
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
267 268 269 270
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
271 272 273
  return code;

_err:
S
Shengliang Guan 已提交
274
  tsdbError("vgId:%d, commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
275 276 277
  return code;
}

H
Hongze Cheng 已提交
278 279 280 281
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
H
Hongze Cheng 已提交
282

H
Hongze Cheng 已提交
283 284
  // memory
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
285

H
Hongze Cheng 已提交
286
  // Reader
H
Hongze Cheng 已提交
287 288
  pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid},
                                       tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
289
  if (pRSet) {
H
Hongze Cheng 已提交
290
    code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
291 292
    if (code) goto _err;

H
Hongze Cheng 已提交
293
    code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL);
H
Hongze Cheng 已提交
294
    if (code) goto _err;
H
Hongze Cheng 已提交
295 296 297 298 299 300 301 302

    code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL);
    if (code) goto _err;

  } else {
    pCommitter->dReader.pReader = NULL;
    taosArrayClear(pCommitter->dReader.aBlockIdx);
    taosArrayClear(pCommitter->dReader.aBlockL);
H
Hongze Cheng 已提交
303
  }
H
Hongze Cheng 已提交
304 305 306
  tMapDataReset(&pCommitter->dReader.mBlock);
  tBlockDataReset(&pCommitter->dReader.bData);
  tBlockDataReset(&pCommitter->dReader.bDatal);
H
Hongze Cheng 已提交
307

H
Hongze Cheng 已提交
308
  // Writer
H
Hongze Cheng 已提交
309 310 311 312 313
  SHeadFile fHead;
  SDataFile fData;
  SLastFile fLast;
  SSmaFile  fSma;
  SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
H
Hongze Cheng 已提交
314
  if (pRSet) {
H
Hongze Cheng 已提交
315 316
    wSet.diskId = pRSet->diskId;
    wSet.fid = pCommitter->commitFid;
H
Hongze Cheng 已提交
317
    fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0, .loffset = 0};
H
Hongze Cheng 已提交
318 319 320
    fData = *pRSet->pDataF;
    fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
    fSma = *pRSet->pSmaF;
H
Hongze Cheng 已提交
321
  } else {
322 323 324 325
    SDiskID did = {0};

    tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);

H
Hongze Cheng 已提交
326 327
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);

328
    wSet.diskId = did;
H
Hongze Cheng 已提交
329
    wSet.fid = pCommitter->commitFid;
H
Hongze Cheng 已提交
330
    fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0, .loffset = 0};
H
Hongze Cheng 已提交
331 332 333
    fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0};
    fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
    fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0};
H
Hongze Cheng 已提交
334
  }
H
Hongze Cheng 已提交
335
  code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
336
  if (code) goto _err;
H
Hongze Cheng 已提交
337

H
Hongze Cheng 已提交
338 339 340 341 342 343
  taosArrayClear(pCommitter->dWriter.aBlockIdx);
  taosArrayClear(pCommitter->dWriter.aBlockL);
  tMapDataReset(&pCommitter->dWriter.mBlock);
  tBlockDataReset(&pCommitter->dWriter.bData);
  tBlockDataReset(&pCommitter->dWriter.bDatal);

H
Hongze Cheng 已提交
344
_exit:
H
Hongze Cheng 已提交
345 346 347
  return code;

_err:
S
Shengliang Guan 已提交
348
  tsdbError("vgId:%d, commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
349
  return code;
H
Hongze Cheng 已提交
350 351
}

H
Hongze Cheng 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmTable.pTSchema) {
    if (pCommitter->skmTable.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmTable.suid = suid;
  pCommitter->skmTable.uid = uid;
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
368 369
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema);
  if (code) goto _exit;
H
Hongze Cheng 已提交
370 371 372 373 374 375

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
H
Hongze Cheng 已提交
376 377
  int32_t code = 0;

H
Hongze Cheng 已提交
378 379
  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
H
Hongze Cheng 已提交
380
      if (suid == 0) {
H
Hongze Cheng 已提交
381
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
382
      } else {
H
Hongze Cheng 已提交
383
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
384 385 386 387
      }
    }
  }

H
Hongze Cheng 已提交
388 389 390
  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
391 392 393
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
  if (code) {
    goto _exit;
H
Hongze Cheng 已提交
394 395 396 397 398 399
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
400 401 402 403
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx,
                                   int8_t toDataOnly) {
  int32_t code = 0;

H
Hongze Cheng 已提交
404 405 406 407 408 409
  if (pBlock->nSubBlock == 0) {
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }
H
Hongze Cheng 已提交
410 411
  }

H
Hongze Cheng 已提交
412 413
  code =
      tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
414 415
  if (code) goto _err;

H
Hongze Cheng 已提交
416
  code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
H
Hongze Cheng 已提交
417 418 419 420 421 422 423 424
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
425 426
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
                                  int8_t toDataOnly) {
H
Hongze Cheng 已提交
427
  int32_t     code = 0;
H
Hongze Cheng 已提交
428
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
H
Hongze Cheng 已提交
429 430
  SBlockData *pBlockDataMerge = &pCommitter->dReader.bData;
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
431 432
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
433
  TSDBROW    *pRow1;
H
Hongze Cheng 已提交
434 435
  TSDBROW     row2;
  TSDBROW    *pRow2 = &row2;
H
Hongze Cheng 已提交
436 437

  // read SBlockData
H
Hongze Cheng 已提交
438
  code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
H
Hongze Cheng 已提交
439 440
  if (code) goto _err;

H
Hongze Cheng 已提交
441 442 443
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
444 445
  // loop to merge
  pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
446
  *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
H
Hongze Cheng 已提交
447
  ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
H
Hongze Cheng 已提交
448
  ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
H
Hongze Cheng 已提交
449
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
450 451
  if (code) goto _err;

H
Hongze Cheng 已提交
452
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
453
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
454 455
  while (true) {
    if (pRow1 == NULL && pRow2 == NULL) {
H
Hongze Cheng 已提交
456
      if (pBlockData->nRow == 0) {
H
Hongze Cheng 已提交
457 458 459 460 461 462 463
        break;
      } else {
        goto _write_block;
      }
    }

    if (pRow1 && pRow2) {
H
Hongze Cheng 已提交
464 465 466 467 468
      int32_t c = tsdbRowCmprFn(pRow1, pRow2);
      if (c < 0) {
        goto _append_mem_row;
      } else if (c > 0) {
        goto _append_block_row;
H
Hongze Cheng 已提交
469 470 471 472
      } else {
        ASSERT(0);
      }
    } else if (pRow1) {
H
Hongze Cheng 已提交
473
      goto _append_mem_row;
H
Hongze Cheng 已提交
474
    } else {
H
Hongze Cheng 已提交
475 476 477 478
      goto _append_block_row;
    }

  _append_mem_row:
H
Hongze Cheng 已提交
479
    code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
480
    if (code) goto _err;
H
Hongze Cheng 已提交
481

H
Hongze Cheng 已提交
482 483 484 485
    tsdbTbDataIterNext(pIter);
    pRow1 = tsdbTbDataIterGet(pIter);
    if (pRow1) {
      if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
H
Hongze Cheng 已提交
486
        code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
487
        if (code) goto _err;
H
Hongze Cheng 已提交
488
      } else {
H
Hongze Cheng 已提交
489
        pRow1 = NULL;
H
Hongze Cheng 已提交
490
      }
H
Hongze Cheng 已提交
491 492
    }

H
Hongze Cheng 已提交
493 494 495 496 497 498 499
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }

  _append_block_row:
H
Hongze Cheng 已提交
500
    code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
H
Hongze Cheng 已提交
501 502
    if (code) goto _err;

H
Hongze Cheng 已提交
503 504
    if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
      *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
H
Hongze Cheng 已提交
505
    } else {
H
Hongze Cheng 已提交
506
      pRow2 = NULL;
H
Hongze Cheng 已提交
507 508
    }

H
Hongze Cheng 已提交
509 510 511 512 513
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }
H
Hongze Cheng 已提交
514

H
Hongze Cheng 已提交
515
  _write_block:
H
Hongze Cheng 已提交
516
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly);
H
Hongze Cheng 已提交
517 518 519
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
520
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
521 522 523 524 525
  }

  return code;

_err:
S
Shengliang Guan 已提交
526
  tsdbError("vgId:%d, tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
527 528 529
  return code;
}

H
Hongze Cheng 已提交
530
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
H
Hongze Cheng 已提交
531 532
  int32_t     code = 0;
  TSDBROW    *pRow;
H
Hongze Cheng 已提交
533 534
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
535
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
536 537
  int64_t     suid = pIter->pTbData->suid;
  int64_t     uid = pIter->pTbData->uid;
H
Hongze Cheng 已提交
538

H
Hongze Cheng 已提交
539 540 541
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
542
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
543
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
544
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
545
  ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
H
Hongze Cheng 已提交
546
  while (true) {
H
Hongze Cheng 已提交
547
    if (pRow == NULL) {
H
Hongze Cheng 已提交
548 549 550 551 552 553 554
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

H
Hongze Cheng 已提交
555
    // update schema
H
Hongze Cheng 已提交
556
    code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
557 558
    if (code) goto _err;

H
Hongze Cheng 已提交
559
    // append
H
Hongze Cheng 已提交
560
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
561 562 563 564
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
565 566 567
    // if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
    // crash on CI, use the block following
    if (pRow) {
568 569
      TSDBKEY tmpKey = TSDBROW_KEY(pRow);
      if (tsdbKeyCmprFn(&tmpKey, &toKey) >= 0) {
570 571 572
        pRow = NULL;
      }
    }
H
Hongze Cheng 已提交
573 574

    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
H
Hongze Cheng 已提交
575
    continue;
H
Hongze Cheng 已提交
576 577

  _write_block:
H
Hongze Cheng 已提交
578
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly);
H
Hongze Cheng 已提交
579 580 581
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
582
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
583 584 585 586 587
  }

  return code;

_err:
S
Shengliang Guan 已提交
588
  tsdbError("vgId:%d, tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
589 590 591
  return code;
}

H
Hongze Cheng 已提交
592
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
593
  int32_t code = 0;
H
Hongze Cheng 已提交
594
  SBlock  block;
H
Hongze Cheng 已提交
595 596

  if (pBlock->last) {
H
Hongze Cheng 已提交
597
    code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlock, &pCommitter->dReader.bData, NULL, NULL);
H
Hongze Cheng 已提交
598 599
    if (code) goto _err;

H
Hongze Cheng 已提交
600
    tBlockReset(&block);
H
Hongze Cheng 已提交
601
    code = tsdbCommitBlockData(pCommitter, &pCommitter->dReader.bData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
602 603
    if (code) goto _err;
  } else {
H
Hongze Cheng 已提交
604
    code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
H
Hongze Cheng 已提交
605 606 607 608 609 610
    if (code) goto _err;
  }

  return code;

_err:
S
Shengliang Guan 已提交
611
  tsdbError("vgId:%d, tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
612 613 614
  return code;
}

H
Hongze Cheng 已提交
615 616
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
617 618
  SBlockIdx  blockIdx = {.suid = suid, .uid = uid};
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
619

H
Hongze Cheng 已提交
620
  code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx);
H
Hongze Cheng 已提交
621 622
  if (code) goto _err;

H
Hongze Cheng 已提交
623
  if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) {
H
Hongze Cheng 已提交
624 625 626
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
627 628 629 630

  return code;

_err:
S
Shengliang Guan 已提交
631
  tsdbError("vgId:%d, commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
632 633 634
  return code;
}

H
Hongze Cheng 已提交
635 636 637 638 639 640 641 642 643
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
H
Hongze Cheng 已提交
644
    pRow = tsdbTbDataIterGet(&iter);
H
Hongze Cheng 已提交
645 646 647 648 649 650 651

    if (pRow == NULL) break;
    key = TSDBROW_KEY(pRow);

    c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
H
Hongze Cheng 已提交
652
      tsdbTbDataIterNext(&iter);
H
Hongze Cheng 已提交
653 654 655 656 657 658 659 660 661 662 663 664
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
  int32_t     code = 0;
H
Hongze Cheng 已提交
665
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
666
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
H
Hongze Cheng 已提交
667
  SBlock      block;
H
Hongze Cheng 已提交
668 669
  TSDBROW    *pRow;

H
Hongze Cheng 已提交
670 671 672
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
673
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
674
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
675 676
  if (code) goto _err;
  while (true) {
H
Hongze Cheng 已提交
677
    if (pRow == NULL) break;
H
Hongze Cheng 已提交
678
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
679 680 681 682 683
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) {
H
Hongze Cheng 已提交
684 685
      TSDBKEY key = TSDBROW_KEY(pRow);
      int32_t c = tBlockCmprFn(&(SBlock){.minKey = key, .maxKey = key}, pBlock);
H
Hongze Cheng 已提交
686 687

      if (c == 0) {
H
Hongze Cheng 已提交
688 689
        code =
            tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
690 691 692 693 694 695 696 697 698
        if (code) goto _err;
      } else if (c > 0) {
        pRow = NULL;
      } else {
        ASSERT(0);
      }
    }
  }

H
Hongze Cheng 已提交
699 700
  block = *pBlock;
  code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
701 702 703 704 705
  if (code) goto _err;

  return code;

_err:
S
Shengliang Guan 已提交
706
  tsdbError("vgId:%d, tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
707 708 709
  return code;
}

H
Hongze Cheng 已提交
710 711
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
712 713
  STbDataIter  iter = {0};
  STbDataIter *pIter = &iter;
H
Hongze Cheng 已提交
714
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
715
  int32_t      iBlock;
H
Hongze Cheng 已提交
716
  int32_t      nBlock;
H
Hongze Cheng 已提交
717 718
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
719 720 721 722

  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
723 724
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;

H
Hongze Cheng 已提交
725 726
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
727 728 729 730 731 732
  } else {
    pIter = NULL;
    pRow = NULL;
  }

  if (pBlockIdx) {
H
Hongze Cheng 已提交
733
    code = tsdbReadBlock(pCommitter->dReader.pReader, pBlockIdx, &pCommitter->dReader.mBlock, NULL);
H
Hongze Cheng 已提交
734 735
    if (code) goto _err;

H
Hongze Cheng 已提交
736
    nBlock = pCommitter->dReader.mBlock.nItem;
H
Hongze Cheng 已提交
737
    ASSERT(nBlock > 0);
H
Hongze Cheng 已提交
738

H
Hongze Cheng 已提交
739 740
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
741 742 743 744
  } else {
    nBlock = 0;
  }

H
Hongze Cheng 已提交
745
  if (pRow == NULL && nBlock == 0) goto _exit;
H
Hongze Cheng 已提交
746 747

  // start ===========
H
Hongze Cheng 已提交
748
  tMapDataReset(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
749 750
  SBlock  block;
  SBlock *pBlock = &block;
H
Hongze Cheng 已提交
751

H
Hongze Cheng 已提交
752
  iBlock = 0;
H
Hongze Cheng 已提交
753
  if (iBlock < nBlock) {
H
Hongze Cheng 已提交
754
    tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
H
Hongze Cheng 已提交
755 756 757 758
  } else {
    pBlock = NULL;
  }

H
Hongze Cheng 已提交
759 760 761 762 763
  if (pRow) {
    code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
764 765
  // merge ===========
  while (true) {
H
Hongze Cheng 已提交
766
    if (pRow == NULL && pBlock == NULL) break;
H
Hongze Cheng 已提交
767

H
Hongze Cheng 已提交
768
    if (pRow && pBlock) {
H
Hongze Cheng 已提交
769
      if (pBlock->last) {
H
Hongze Cheng 已提交
770
        code = tsdbMergeTableData(pCommitter, pIter, pBlock,
H
Hongze Cheng 已提交
771
                                  (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
772 773 774
        if (code) goto _err;

        pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
775
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
776
        iBlock++;
H
Hongze Cheng 已提交
777
        if (iBlock < nBlock) {
H
Hongze Cheng 已提交
778
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
H
Hongze Cheng 已提交
779 780 781
        } else {
          pBlock = NULL;
        }
H
Hongze Cheng 已提交
782 783

        ASSERT(pRow == NULL && pBlock == NULL);
H
Hongze Cheng 已提交
784
      } else {
H
Hongze Cheng 已提交
785
        int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
H
Hongze Cheng 已提交
786
        if (c > 0) {
H
Hongze Cheng 已提交
787
          // only disk data
H
Hongze Cheng 已提交
788
          code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
789 790
          if (code) goto _err;

H
Hongze Cheng 已提交
791 792
          iBlock++;
          if (iBlock < nBlock) {
H
Hongze Cheng 已提交
793
            tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
H
Hongze Cheng 已提交
794 795 796
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
797
        } else if (c < 0) {
H
Hongze Cheng 已提交
798
          // only memory data
H
Hongze Cheng 已提交
799
          code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
H
Hongze Cheng 已提交
800 801 802
          if (code) goto _err;

          pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
803
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
804
        } else {
H
Hongze Cheng 已提交
805
          // merge memory and disk
H
Hongze Cheng 已提交
806 807
          int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
          ASSERT(nOvlp);
H
Hongze Cheng 已提交
808
          if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
H
Hongze Cheng 已提交
809 810
            code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
            if (code) goto _err;
H
Hongze Cheng 已提交
811
          } else {
H
Hongze Cheng 已提交
812 813 814 815 816 817 818 819
            TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
            int8_t  toDataOnly = 0;

            if (iBlock < nBlock - 1) {
              toDataOnly = 1;

              SBlock nextBlock = {0};
              tBlockReset(&nextBlock);
H
Hongze Cheng 已提交
820
              tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock + 1, &nextBlock, tGetBlock);
H
Hongze Cheng 已提交
821
              toKey = nextBlock.minKey;
H
Hongze Cheng 已提交
822
            }
H
Hongze Cheng 已提交
823 824 825

            code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
            if (code) goto _err;
H
Hongze Cheng 已提交
826
          }
H
Hongze Cheng 已提交
827 828 829 830 831

          pRow = tsdbTbDataIterGet(pIter);
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
          iBlock++;
          if (iBlock < nBlock) {
H
Hongze Cheng 已提交
832
            tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
H
Hongze Cheng 已提交
833 834 835
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
836 837 838
        }
      }
    } else if (pBlock) {
H
Hongze Cheng 已提交
839
      code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
840 841
      if (code) goto _err;

H
Hongze Cheng 已提交
842 843
      iBlock++;
      if (iBlock < nBlock) {
H
Hongze Cheng 已提交
844
        tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
H
Hongze Cheng 已提交
845 846 847
      } else {
        pBlock = NULL;
      }
H
Hongze Cheng 已提交
848
    } else {
H
Hongze Cheng 已提交
849 850
      code =
          tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
851 852 853
      if (code) goto _err;

      pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
854 855
      if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
      ASSERT(pRow == NULL);
H
Hongze Cheng 已提交
856 857 858
    }
  }

H
Hongze Cheng 已提交
859 860
  // end =====================
  code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
H
Hongze Cheng 已提交
861 862 863 864 865 866 867 868 869 870
  if (code) goto _err;

_exit:
  if (pIter) {
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  }
  return code;

_err:
S
Shengliang Guan 已提交
871
  tsdbError("vgId:%d, tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
872 873 874
  return code;
}

H
Hongze Cheng 已提交
875 876 877
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
878 879 880 881 882
  // write aBlockL
  code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL);
  if (code) goto _err;

  // write aBlockIdx
H
Hongze Cheng 已提交
883
  code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL);
H
Hongze Cheng 已提交
884 885 886
  if (code) goto _err;

  // update file header
H
Hongze Cheng 已提交
887
  code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
H
Hongze Cheng 已提交
888 889 890
  if (code) goto _err;

  // upsert SDFileSet
H
Hongze Cheng 已提交
891
  code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
H
Hongze Cheng 已提交
892 893 894
  if (code) goto _err;

  // close and sync
H
Hongze Cheng 已提交
895
  code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
H
Hongze Cheng 已提交
896 897
  if (code) goto _err;

H
Hongze Cheng 已提交
898 899
  if (pCommitter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
H
Hongze Cheng 已提交
900
    if (code) goto _err;
H
Hongze Cheng 已提交
901 902 903 904 905 906
  }

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
907
  tsdbError("vgId:%d, commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
908 909 910 911
  return code;
}

static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
912 913 914
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
915 916 917 918 919 920

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

  // commit file data impl
H
Hongze Cheng 已提交
921 922
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
923
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
924
  int32_t    nBlockIdx = taosArrayGetSize(pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
925
  STbData   *pTbData;
H
Hongze Cheng 已提交
926
  SBlockIdx *pBlockIdx;
H
Hongze Cheng 已提交
927

H
Hongze Cheng 已提交
928
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
929

H
Hongze Cheng 已提交
930
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
H
Hongze Cheng 已提交
931
  pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
932 933 934
  while (pTbData || pBlockIdx) {
    if (pTbData && pBlockIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
H
Hongze Cheng 已提交
935

H
Hongze Cheng 已提交
936
      if (c == 0) {
H
Hongze Cheng 已提交
937
        goto _commit_table_mem_and_disk;
H
Hongze Cheng 已提交
938
      } else if (c < 0) {
H
Hongze Cheng 已提交
939
        goto _commit_table_mem_data;
H
Hongze Cheng 已提交
940
      } else {
H
Hongze Cheng 已提交
941
        goto _commit_table_disk_data;
H
Hongze Cheng 已提交
942
      }
H
Hongze Cheng 已提交
943
    } else if (pBlockIdx) {
H
Hongze Cheng 已提交
944 945 946 947
      goto _commit_table_disk_data;
    } else {
      goto _commit_table_mem_data;
    }
H
Hongze Cheng 已提交
948

H
Hongze Cheng 已提交
949 950 951 952 953
  _commit_table_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
954
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
955
    continue;
H
Hongze Cheng 已提交
956

H
Hongze Cheng 已提交
957 958 959 960 961
  _commit_table_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
962
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
963 964 965 966 967 968 969
    continue;

  _commit_table_mem_and_disk:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
970
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
971
    iTbData++;
H
Hongze Cheng 已提交
972
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
973
    continue;
H
Hongze Cheng 已提交
974 975
  }

H
Hongze Cheng 已提交
976 977
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
978
  if (code) goto _err;
H
Hongze Cheng 已提交
979 980 981 982

  return code;

_err:
S
Shengliang Guan 已提交
983
  tsdbError("vgId:%d, commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
984 985
  tsdbDataFReaderClose(&pCommitter->dReader.pReader);
  tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
H
Hongze Cheng 已提交
986 987 988
  return code;
}

H
Hongze Cheng 已提交
989 990
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
991
  int32_t code = 0;
H
Hongze Cheng 已提交
992

H
Hongze Cheng 已提交
993 994
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
995

H
more  
Hongze Cheng 已提交
996
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
997 998
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
H
more  
Hongze Cheng 已提交
999
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1000

H
Hongze Cheng 已提交
1001
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
1002
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
1003 1004 1005 1006
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
1007
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
1008

H
Hongze Cheng 已提交
1009
  code = tsdbFSCopy(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
1010 1011 1012 1013 1014
  if (code) goto _err;

  return code;

_err:
S
Shengliang Guan 已提交
1015
  tsdbError("vgId:%d, tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1016 1017 1018
  return code;
}

H
Hongze Cheng 已提交
1019 1020 1021
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
1022
  // Reader
H
Hongze Cheng 已提交
1023 1024
  pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
1025 1026 1027 1028
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
1029 1030
  pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL));
  if (pCommitter->dReader.aBlockL == NULL) {
H
Hongze Cheng 已提交
1031 1032 1033 1034
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
1035
  code = tBlockDataInit(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
1036
  if (code) goto _exit;
H
Hongze Cheng 已提交
1037

H
Hongze Cheng 已提交
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
  code = tBlockDataInit(&pCommitter->dReader.bDatal);
  if (code) goto _exit;

  // Writer
  pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dWriter.aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  pCommitter->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL));
  if (pCommitter->dWriter.aBlockL == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
1054
  code = tBlockDataInit(&pCommitter->dWriter.bData);
H
Hongze Cheng 已提交
1055
  if (code) goto _exit;
H
Hongze Cheng 已提交
1056

H
Hongze Cheng 已提交
1057 1058 1059
  code = tBlockDataInit(&pCommitter->dWriter.bDatal);
  if (code) goto _exit;

H
Hongze Cheng 已提交
1060 1061 1062 1063 1064
_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1065
  // Reader
H
Hongze Cheng 已提交
1066
  taosArrayDestroy(pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
1067
  taosArrayDestroy(pCommitter->dReader.aBlockL);
H
Hongze Cheng 已提交
1068 1069
  tMapDataClear(&pCommitter->dReader.mBlock);
  tBlockDataClear(&pCommitter->dReader.bData, 1);
H
Hongze Cheng 已提交
1070 1071 1072
  tBlockDataClear(&pCommitter->dReader.bDatal, 1);

  // Writer
H
Hongze Cheng 已提交
1073
  taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
1074
  taosArrayDestroy(pCommitter->dWriter.aBlockL);
H
Hongze Cheng 已提交
1075 1076
  tMapDataClear(&pCommitter->dWriter.mBlock);
  tBlockDataClear(&pCommitter->dWriter.bData, 1);
H
Hongze Cheng 已提交
1077
  tBlockDataClear(&pCommitter->dWriter.bDatal, 1);
H
Hongze Cheng 已提交
1078 1079
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
1080 1081
}

H
Hongze Cheng 已提交
1082 1083 1084 1085
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1086

H
Hongze Cheng 已提交
1087
  // check
H
Hongze Cheng 已提交
1088
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
1089

H
Hongze Cheng 已提交
1090 1091
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
1092
  if (code) goto _err;
H
Hongze Cheng 已提交
1093 1094 1095

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
1096 1097 1098 1099 1100
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
1101
    if (code) goto _err;
H
Hongze Cheng 已提交
1102
  }
H
Hongze Cheng 已提交
1103

H
Hongze Cheng 已提交
1104 1105 1106
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
1107
_exit:
S
Shengliang Guan 已提交
1108
  tsdbDebug("vgId:%d, commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
H
Hongze Cheng 已提交
1109
  return code;
H
Hongze Cheng 已提交
1110

H
Hongze Cheng 已提交
1111
_err:
H
Hongze Cheng 已提交
1112
  tsdbCommitDataEnd(pCommitter);
S
Shengliang Guan 已提交
1113
  tsdbError("vgId:%d, commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1114 1115
  return code;
}
H
Hongze Cheng 已提交
1116

H
Hongze Cheng 已提交
1117 1118 1119 1120
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1121

H
Hongze Cheng 已提交
1122 1123
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
1124
  }
H
Hongze Cheng 已提交
1125

H
Hongze Cheng 已提交
1126 1127 1128 1129 1130
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1131

H
Hongze Cheng 已提交
1132
  // impl
H
Hongze Cheng 已提交
1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
  int32_t  nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1188
  }
H
Hongze Cheng 已提交
1189

H
Hongze Cheng 已提交
1190 1191 1192 1193 1194
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1195

H
Hongze Cheng 已提交
1196
_exit:
S
Shengliang Guan 已提交
1197
  tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1198 1199 1200
  return code;

_err:
S
Shengliang Guan 已提交
1201
  tsdbError("vgId:%d, commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1202
  return code;
H
Hongze Cheng 已提交
1203 1204
}

H
Hongze Cheng 已提交
1205
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1206 1207 1208 1209
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
1210
  ASSERT(eno == 0);
H
Hongze Cheng 已提交
1211 1212 1213

  code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
  if (code) goto _err;
H
Hongze Cheng 已提交
1214

H
Hongze Cheng 已提交
1215
  // lock
H
Hongze Cheng 已提交
1216
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1217 1218 1219 1220 1221 1222 1223 1224

  // commit or rollback
  code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _err;
  }

H
Hongze Cheng 已提交
1225
  pTsdb->imem = NULL;
H
Hongze Cheng 已提交
1226 1227

  // unlock
H
Hongze Cheng 已提交
1228 1229 1230
  taosThreadRwlockUnlock(&pTsdb->rwLock);

  tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
1231
  tsdbFSDestroy(&pCommitter->fs);
H
Hongze Cheng 已提交
1232

S
Shengliang Guan 已提交
1233
  tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1234 1235 1236
  return code;

_err:
S
Shengliang Guan 已提交
1237
  tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1238 1239
  return code;
}