tsdbCommit2.c 5.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
typedef struct {
  STsdb *pTsdb;
  // config
  int64_t commitID;
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
  int8_t  sttTrigger;
  SArray *aTbDataP;
  // context
  TSKEY      nextKey;  // reset by each table commit
  int32_t    fid;
  int32_t    expLevel;
  TSKEY      minKey;
  TSKEY      maxKey;
  SSkmInfo   skmTable;
  SSkmInfo   skmRow;
  SBlockData bData;
  SColData   aColData[4];  // <suid, uid, ts, version>
  SArray    *aSttBlk;      // SArray<SSttBlk>
  SArray    *aDelBlk;      // SArray<SDelBlk>
} SCommitter;
H
Hongze Cheng 已提交
42

H
Hongze Cheng 已提交
43 44 45 46
static int32_t tsdbRowIsDeleted(SCommitter *pCommitter, TSDBROW *pRow) {
  // TODO
  ASSERT(0);
  return 0;
H
Hongze Cheng 已提交
47 48
}

H
Hongze Cheng 已提交
49
static int32_t tsdbCommitTimeSeriesData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
50 51 52
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
53
  // TODO
H
Hongze Cheng 已提交
54 55 56

_exit:
  if (code) {
H
Hongze Cheng 已提交
57
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
H
Hongze Cheng 已提交
58 59 60 61
  }
  return code;
}

H
Hongze Cheng 已提交
62
static int32_t tsdbCommitDelData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
63 64 65
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
66
  // TODO
H
Hongze Cheng 已提交
67 68 69

_exit:
  if (code) {
H
Hongze Cheng 已提交
70
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
H
Hongze Cheng 已提交
71 72 73 74
  }
  return code;
}

H
Hongze Cheng 已提交
75
static int32_t tsdbCommitNextFSet(SCommitter *pCommitter, int8_t *done) {
H
Hongze Cheng 已提交
76 77 78
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
79
  STsdb *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81
  // fset commit start (TODO)
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83 84
  // commit fset
  code = tsdbCommitTimeSeriesData(pCommitter);
H
Hongze Cheng 已提交
85 86
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
87
  // fset commit end (TODO)
H
Hongze Cheng 已提交
88 89 90

_exit:
  if (code) {
H
Hongze Cheng 已提交
91
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
92 93 94 95
  }
  return code;
}

H
Hongze Cheng 已提交
96
static int32_t tsdbCommitterOpen(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
97 98 99
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
100 101
  memset(pCommitter, 0, sizeof(SCommitter));
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
102

H
Hongze Cheng 已提交
103
  // TODO
H
Hongze Cheng 已提交
104 105 106

_exit:
  if (code) {
H
Hongze Cheng 已提交
107
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
108
  } else {
H
Hongze Cheng 已提交
109
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
110 111 112 113
  }
  return code;
}

H
Hongze Cheng 已提交
114
static int32_t tsdbCommitterClose(SCommitter *pCommiter, int32_t eno) {
H
Hongze Cheng 已提交
115
  int32_t code = 0;
H
Hongze Cheng 已提交
116
  // TODO
H
Hongze Cheng 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
  return code;
}

int32_t tsdbPreCommit(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
  ASSERT(pTsdb->imem == NULL);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  return 0;
}

int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) {
  if (!pTsdb) return 0;

  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
134
  SMemTable *pMem = pTsdb->imem;
H
Hongze Cheng 已提交
135

H
Hongze Cheng 已提交
136
  if (pMem->nRow == 0 && pMem->nDel == 0) {
H
Hongze Cheng 已提交
137 138 139
    taosThreadRwlockWrlock(&pTsdb->rwLock);
    pTsdb->imem = NULL;
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
140 141 142 143
    tsdbUnrefMemTable(pMem, NULL, true);
  } else {
    SCommitter committer;
    int8_t     done = 0;
H
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145 146
    code = tsdbCommitterOpen(pTsdb, pInfo, &committer);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
147

H
Hongze Cheng 已提交
148 149
    while (!done && (code = tsdbCommitNextFSet(&committer, &done))) {
    }
H
Hongze Cheng 已提交
150

H
Hongze Cheng 已提交
151 152 153
    code = tsdbCommitterClose(&committer, code);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
154 155 156

_exit:
  if (code) {
H
Hongze Cheng 已提交
157 158 159 160
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pMem->nRow,
             pMem->nDel);
H
Hongze Cheng 已提交
161 162 163 164
  }
  return code;
}

H
Hongze Cheng 已提交
165
#if 0
H
Hongze Cheng 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
int32_t tsdbCommitCommit(STsdb *pTsdb) {
  int32_t    code = 0;
  int32_t    lino = 0;
  SMemTable *pMemTable = pTsdb->imem;

  // lock
  taosThreadRwlockWrlock(&pTsdb->rwLock);

  code = tsdbFSCommit(pTsdb);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pTsdb->imem = NULL;

  // unlock
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (pMemTable) {
    tsdbUnrefMemTable(pMemTable, NULL, true);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
  }
  return code;
}

int32_t tsdbCommitRollback(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tsdbFSRollback(pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
  }
  return code;
H
Hongze Cheng 已提交
211 212
}
#endif