tsdbCommit.c 10.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "dev.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19 20 21 22 23 24 25 26 27 28 29
typedef struct {
  STsdb *pTsdb;
  // config
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
  int8_t  sttTrigger;
  SArray *aTbDataP;
  // context
H
Hongze Cheng 已提交
30 31 32 33 34 35
  TSKEY   nextKey;
  int32_t fid;
  int32_t expLevel;
  TSKEY   minKey;
  TSKEY   maxKey;
  // writer
H
Hongze Cheng 已提交
36
  struct SSttFWriter *pWriter;
H
Hongze Cheng 已提交
37
} SCommitter;
H
Hongze Cheng 已提交
38

H
Hongze Cheng 已提交
39
static int32_t tsdbCommitOpenWriter(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
40 41 42
  int32_t code;
  int32_t lino;

H
Hongze Cheng 已提交
43 44 45 46 47 48 49 50 51
  struct SSttFWriterConf conf = {
      .pTsdb = pCommitter->pTsdb,
      .pSkmTb = NULL,
      .pSkmRow = NULL,
      .maxRow = pCommitter->maxRow,
      .szPage = pCommitter->pTsdb->pVnode->config.tsdbPageSize,
      .cmprAlg = pCommitter->cmprAlg,
      .aBuf = NULL,
  };
H
Hongze Cheng 已提交
52

H
Hongze Cheng 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
  // pCommitter->pTsdb->pFS = NULL;
  // taosbsearch(pCommitter->pTsdb->pFS->aFileSet, &pCommitter->fid, tsdbCompareFid, &lino);
  struct SFileSet *pSet = NULL;
  if (pSet == NULL) {
    conf.file = (struct STFile){
        .cid = 1,
        .fid = pCommitter->fid,
        .diskId = (SDiskID){0},
        .type = TSDB_FTYPE_STT,
    };
    tsdbTFileInit(pCommitter->pTsdb, &conf.file);
  } else {
    // TODO
    ASSERT(0);
  }
H
Hongze Cheng 已提交
68 69

  code = tsdbSttFWriterOpen(&conf, &pCommitter->pWriter);
H
Hongze Cheng 已提交
70 71 72 73 74 75 76
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
              tstrerror(code), pCommitter->fid);
  }
H
Hongze Cheng 已提交
77 78 79
  return code;
}

H
Hongze Cheng 已提交
80
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) {
H
Hongze Cheng 已提交
81 82 83 84 85 86 87 88
  int32_t code = 0;
  int32_t lino;

  if (pCommitter->pWriter == NULL) {
    code = tsdbCommitOpenWriter(pCommitter);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
89
  code = tsdbSttFWriteTSData(pCommitter->pWriter, tbid, pRow);
H
Hongze Cheng 已提交
90 91 92 93 94 95 96
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  } else {
    tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64,
H
Hongze Cheng 已提交
97
              TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts,
H
Hongze Cheng 已提交
98 99
              TSDBROW_KEY(pRow).version);
  }
H
Hongze Cheng 已提交
100
  return 0;
H
Hongze Cheng 已提交
101 102
}

H
Hongze Cheng 已提交
103 104 105 106 107 108 109
static int32_t tsdbCommitWriteDelData(SCommitter *pCommitter, int64_t suid, int64_t uid, int64_t version, int64_t sKey,
                                      int64_t eKey) {
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
110
static int32_t commit_timeseries_data(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
111 112 113
  int32_t code = 0;
  int32_t lino;

H
Hongze Cheng 已提交
114
  int64_t    nRow = 0;
H
Hongze Cheng 已提交
115 116
  SMemTable *pMem = pCommitter->pTsdb->imem;

H
Hongze Cheng 已提交
117 118 119
  if (pMem->nRow == 0) {  // no time-series data to commit
    goto _exit;
  }
H
Hongze Cheng 已提交
120

H
Hongze Cheng 已提交
121
  TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
122 123
  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbDataIter iter;
H
Hongze Cheng 已提交
124 125
    STbData    *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);

H
Hongze Cheng 已提交
126 127 128 129 130 131
    tsdbTbDataIterOpen(pTbData, &from, 0, &iter);

    for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) {
      TSDBKEY rowKey = TSDBROW_KEY(pRow);

      if (rowKey.ts > pCommitter->maxKey) {
H
Hongze Cheng 已提交
132
        pCommitter->nextKey = TMIN(pCommitter->nextKey, rowKey.ts);
H
Hongze Cheng 已提交
133 134 135
        break;
      }

H
Hongze Cheng 已提交
136
      nRow++;
H
Hongze Cheng 已提交
137

H
Hongze Cheng 已提交
138
      code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow);
H
Hongze Cheng 已提交
139 140 141 142 143 144 145 146 147
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
H
Hongze Cheng 已提交
148
              nRow);
H
Hongze Cheng 已提交
149 150 151 152
  }
  return code;
}

H
Hongze Cheng 已提交
153
static int32_t commit_delete_data(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
154 155 156
  int32_t code = 0;
  int32_t lino;

H
Hongze Cheng 已提交
157 158
  ASSERTS(0, "not implemented yet");

H
Hongze Cheng 已提交
159
  int64_t    nDel = 0;
H
Hongze Cheng 已提交
160 161
  SMemTable *pMem = pCommitter->pTsdb->imem;

H
Hongze Cheng 已提交
162 163 164
  if (pMem->nDel == 0) {  // no del data
    goto _exit;
  }
H
Hongze Cheng 已提交
165 166 167 168 169

  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);

    for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
170 171 172 173 174
      if (pDelData->eKey < pCommitter->minKey) continue;
      if (pDelData->sKey > pCommitter->maxKey) {
        pCommitter->nextKey = TMIN(pCommitter->nextKey, pDelData->sKey);
        continue;
      }
H
Hongze Cheng 已提交
175

H
Hongze Cheng 已提交
176 177
      code = tsdbCommitWriteDelData(pCommitter, pTbData->suid, pTbData->uid, pDelData->version,
                                    pDelData->sKey /* TODO */, pDelData->eKey /* TODO */);
H
Hongze Cheng 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
              pMem->nDel);
  }
  return code;
}

H
Hongze Cheng 已提交
192
static int32_t start_commit_file_set(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
193 194 195 196
  pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
  pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
H
Hongze Cheng 已提交
197
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
198

H
Hongze Cheng 已提交
199 200 201 202
  tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d",
            TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, pCommitter->minKey, pCommitter->maxKey,
            pCommitter->expLevel);
  return 0;
H
Hongze Cheng 已提交
203 204
}

H
Hongze Cheng 已提交
205
static int32_t end_commit_file_set(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
206 207 208 209 210 211 212 213 214 215 216 217
  int32_t code = 0;
  int32_t lino = 0;

  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
218
static int32_t commit_next_file_set(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
219 220 221
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
222
  // fset commit start
H
Hongze Cheng 已提交
223
  code = start_commit_file_set(pCommitter);
H
Hongze Cheng 已提交
224
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226
  // commit fset
H
Hongze Cheng 已提交
227
  code = commit_timeseries_data(pCommitter);
H
Hongze Cheng 已提交
228 229
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
230
  code = commit_delete_data(pCommitter);
H
Hongze Cheng 已提交
231 232 233
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
234
  code = end_commit_file_set(pCommitter);
H
Hongze Cheng 已提交
235
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
236 237 238

_exit:
  if (code) {
H
Hongze Cheng 已提交
239 240
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
241 242 243 244
  }
  return code;
}

H
Hongze Cheng 已提交
245
static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
246
  int32_t code = 0;
H
Hongze Cheng 已提交
247
  int32_t lino;
H
Hongze Cheng 已提交
248

H
Hongze Cheng 已提交
249
  // set config
H
Hongze Cheng 已提交
250 251
  memset(pCommitter, 0, sizeof(SCommitter));
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
252 253 254 255 256 257 258 259 260 261 262 263
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
  pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
  pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
  pCommitter->sttTrigger = 0;  // TODO

  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
264

H
Hongze Cheng 已提交
265 266
  // start loop
  pCommitter->nextKey = pTsdb->imem->minKey;  // TODO
H
Hongze Cheng 已提交
267 268 269

_exit:
  if (code) {
H
Hongze Cheng 已提交
270
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
271
  } else {
H
Hongze Cheng 已提交
272
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
273 274 275 276
  }
  return code;
}

H
Hongze Cheng 已提交
277
static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
H
Hongze Cheng 已提交
278
  int32_t code = 0;
H
Hongze Cheng 已提交
279
  // TODO
H
Hongze Cheng 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
  return code;
}

int32_t tsdbPreCommit(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
  ASSERT(pTsdb->imem == NULL);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  return 0;
}

int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) {
  if (!pTsdb) return 0;

  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
297
  SMemTable *pMem = pTsdb->imem;
H
Hongze Cheng 已提交
298

H
Hongze Cheng 已提交
299
  if (pMem->nRow == 0 && pMem->nDel == 0) {
H
Hongze Cheng 已提交
300 301 302
    taosThreadRwlockWrlock(&pTsdb->rwLock);
    pTsdb->imem = NULL;
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
303 304 305
    tsdbUnrefMemTable(pMem, NULL, true);
  } else {
    SCommitter committer;
H
Hongze Cheng 已提交
306

H
Hongze Cheng 已提交
307
    code = open_committer(pTsdb, pInfo, &committer);
H
Hongze Cheng 已提交
308
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
309

H
Hongze Cheng 已提交
310
    while (committer.nextKey != TSKEY_MAX) {
H
Hongze Cheng 已提交
311
      code = commit_next_file_set(&committer);
H
Hongze Cheng 已提交
312
      if (code) break;
H
Hongze Cheng 已提交
313
    }
H
Hongze Cheng 已提交
314

H
Hongze Cheng 已提交
315
    code = close_committer(&committer, code);
H
Hongze Cheng 已提交
316 317
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
318 319 320

_exit:
  if (code) {
H
Hongze Cheng 已提交
321 322 323 324
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pMem->nRow,
             pMem->nDel);
H
Hongze Cheng 已提交
325 326 327 328
  }
  return code;
}

H
Hongze Cheng 已提交
329
#if 0
H
Hongze Cheng 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
int32_t tsdbCommitCommit(STsdb *pTsdb) {
  int32_t    code = 0;
  int32_t    lino = 0;
  SMemTable *pMemTable = pTsdb->imem;

  // lock
  taosThreadRwlockWrlock(&pTsdb->rwLock);

  code = tsdbFSCommit(pTsdb);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pTsdb->imem = NULL;

  // unlock
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (pMemTable) {
    tsdbUnrefMemTable(pMemTable, NULL, true);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
  }
  return code;
}

int32_t tsdbCommitRollback(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tsdbFSRollback(pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
  }
  return code;
H
Hongze Cheng 已提交
375 376
}
#endif