walMgmt.c 7.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
S
Shengliang Guan 已提交
19
#include "tref.h"
S
TD-1895  
Shengliang Guan 已提交
20
#include "tfile.h"
L
Liu Jicong 已提交
21
#include "compare.h"
22
#include "walInt.h"
S
Shengliang Guan 已提交
23

L
Liu Jicong 已提交
24 25 26 27 28
//internal
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);

S
Shengliang Guan 已提交
29
typedef struct {
L
Liu Jicong 已提交
30
  int32_t   refSetId;
L
Liu Jicong 已提交
31
  uint32_t  seq;
S
Shengliang Guan 已提交
32
  int8_t    stop;
L
Liu Jicong 已提交
33
  int8_t    inited;
S
Shengliang Guan 已提交
34 35 36
  pthread_t thread;
} SWalMgmt;

L
Liu Jicong 已提交
37
static SWalMgmt tsWal = {0, .seq = 1};
S
Shengliang Guan 已提交
38 39 40 41 42
static int32_t  walCreateThread();
static void     walStopThread();
static int32_t  walInitObj(SWal *pWal);
static void     walFreeObj(void *pWal);

L
Liu Jicong 已提交
43 44 45 46
int64_t walGetSeq() {
  return (int64_t)atomic_load_32(&tsWal.seq);
}

S
Shengliang Guan 已提交
47
int32_t walInit() {
L
Liu Jicong 已提交
48 49
  int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
  if(old == 1) return 0;
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51 52 53 54 55 56
  int code = tfInit();
  if(code != 0) {
    wError("failed to init tfile since %s", tstrerror(code));
    atomic_store_8(&tsWal.inited, 0);
    return code;
  }
L
Liu Jicong 已提交
57
  tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
S
Shengliang Guan 已提交
58

L
Liu Jicong 已提交
59
  code = walCreateThread();
L
Liu Jicong 已提交
60
  if (code != 0) {
S
TD-1846  
Shengliang Guan 已提交
61
    wError("failed to init wal module since %s", tstrerror(code));
L
Liu Jicong 已提交
62
    atomic_store_8(&tsWal.inited, 0);
S
Shengliang Guan 已提交
63 64 65
    return code;
  }

L
Liu Jicong 已提交
66
  wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
L
Liu Jicong 已提交
67
  return 0;
S
Shengliang Guan 已提交
68 69 70
}

void walCleanUp() {
71 72 73 74
  int old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
  if(old == 0) {
    return;
  }
S
Shengliang Guan 已提交
75
  walStopThread();
L
Liu Jicong 已提交
76
  taosCloseRef(tsWal.refSetId);
S
Shengliang Guan 已提交
77 78 79
  wInfo("wal module is cleaned up");
}

L
Liu Jicong 已提交
80
SWal *walOpen(const char *path, SWalCfg *pCfg) {
L
Liu Jicong 已提交
81
  SWal *pWal = malloc(sizeof(SWal));
S
Shengliang Guan 已提交
82 83 84 85
  if (pWal == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }
L
Liu Jicong 已提交
86
  memset(pWal, 0, sizeof(SWal));
L
Liu Jicong 已提交
87 88
  pWal->writeLogTfd = -1;
  pWal->writeIdxTfd = -1;
L
Liu Jicong 已提交
89
  pWal->writeCur = -1;
L
Liu Jicong 已提交
90 91

  //set config
L
Liu Jicong 已提交
92
  memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
L
Liu Jicong 已提交
93

L
Liu Jicong 已提交
94
  //init version info
L
Liu Jicong 已提交
95 96 97 98
  pWal->vers.firstVer = -1;
  pWal->vers.commitVer = -1;
  pWal->vers.snapshotVer = -1;
  pWal->vers.lastVer = -1;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
  pWal->vers.verInSnapshotting = -1;
L
Liu Jicong 已提交
101

L
Liu Jicong 已提交
102 103
  pWal->totSize = 0;

L
Liu Jicong 已提交
104
  //init status
L
Liu Jicong 已提交
105
  pWal->lastRollSeq = -1;
L
Liu Jicong 已提交
106

L
Liu Jicong 已提交
107
  //init write buffer
L
Liu Jicong 已提交
108 109
  memset(&pWal->writeHead, 0, sizeof(SWalHead));
  pWal->writeHead.head.sver = 0;
L
Liu Jicong 已提交
110

S
TD-1846  
Shengliang Guan 已提交
111
  tstrncpy(pWal->path, path, sizeof(pWal->path));
S
Shengliang Guan 已提交
112 113
  pthread_mutex_init(&pWal->mutex, NULL);

S
TD-1847  
Shengliang Guan 已提交
114
  pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
S
Shengliang Guan 已提交
115 116
  if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;

L
Liu Jicong 已提交
117
  if (walInitObj(pWal) != 0) {
S
Shengliang Guan 已提交
118 119 120 121
    walFreeObj(pWal);
    return NULL;
  }

L
Liu Jicong 已提交
122 123
   pWal->refId = taosAddRef(tsWal.refSetId, pWal);
   if (pWal->refId < 0) {
S
Shengliang Guan 已提交
124 125 126
    walFreeObj(pWal);
    return NULL;
  }
L
Liu Jicong 已提交
127
  walReadMeta(pWal);
S
Shengliang Guan 已提交
128

L
Liu Jicong 已提交
129
  wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod);
S
Shengliang Guan 已提交
130 131 132 133

  return pWal;
}

L
Liu Jicong 已提交
134 135
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
  if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR;
S
Shengliang Guan 已提交
136

L
Liu Jicong 已提交
137 138 139
  if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod) {
    wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->cfg.vgId, pWal->cfg.level,
           pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
L
Liu Jicong 已提交
140
    return 0;
S
Shengliang Guan 已提交
141 142
  }

L
Liu Jicong 已提交
143 144
  wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->cfg.vgId, pWal->cfg.level,
        pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
S
Shengliang Guan 已提交
145

L
Liu Jicong 已提交
146 147
  pWal->cfg.level = pCfg->level;
  pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod;
148
  pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
S
TD-1846  
Shengliang Guan 已提交
149
  if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
S
Shengliang Guan 已提交
150

L
Liu Jicong 已提交
151
  return 0;
S
TD-1894  
Shengliang Guan 已提交
152 153
}

L
Liu Jicong 已提交
154 155
void walClose(SWal *pWal) {
  if (pWal == NULL) return;
S
TD-1846  
Shengliang Guan 已提交
156

157
  pthread_mutex_lock(&pWal->mutex);
L
Liu Jicong 已提交
158
  tfClose(pWal->writeLogTfd);
L
Liu Jicong 已提交
159
  pWal->writeLogTfd = -1;
L
Liu Jicong 已提交
160
  tfClose(pWal->writeIdxTfd);
L
Liu Jicong 已提交
161 162 163 164
  pWal->writeIdxTfd = -1;
  walWriteMeta(pWal);
  taosArrayDestroy(pWal->fileInfoSet);
  pWal->fileInfoSet = NULL;
165
  pthread_mutex_unlock(&pWal->mutex);
L
Liu Jicong 已提交
166
  taosRemoveRef(tsWal.refSetId, pWal->refId);
S
Shengliang Guan 已提交
167 168 169
}

static int32_t walInitObj(SWal *pWal) {
170
  if (taosMkDir(pWal->path) != 0) {
L
Liu Jicong 已提交
171
    wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
S
TD-1846  
Shengliang Guan 已提交
172
    return TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
173
  }
L
Liu Jicong 已提交
174
  pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
L
Liu Jicong 已提交
175
  if(pWal->fileInfoSet == NULL) {
L
Liu Jicong 已提交
176
    wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
L
Liu Jicong 已提交
177 178
    return TAOS_SYSTEM_ERROR(errno);
  }
S
Shengliang Guan 已提交
179

L
Liu Jicong 已提交
180
  wDebug("vgId:%d, object is initialized", pWal->cfg.vgId);
L
Liu Jicong 已提交
181
  return 0;
S
Shengliang Guan 已提交
182 183 184
}

static void walFreeObj(void *wal) {
S
TD-1846  
Shengliang Guan 已提交
185
  SWal *pWal = wal;
L
Liu Jicong 已提交
186
  wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
S
Shengliang Guan 已提交
187

L
Liu Jicong 已提交
188 189
  tfClose(pWal->writeLogTfd);
  tfClose(pWal->writeIdxTfd);
L
Liu Jicong 已提交
190 191
  taosArrayDestroy(pWal->fileInfoSet);
  pWal->fileInfoSet = NULL;
S
Shengliang Guan 已提交
192 193 194 195
  pthread_mutex_destroy(&pWal->mutex);
  tfree(pWal);
}

S
TD-1846  
Shengliang Guan 已提交
196
static bool walNeedFsync(SWal *pWal) {
L
Liu Jicong 已提交
197
  if (pWal->cfg.fsyncPeriod <= 0 || pWal->cfg.level != TAOS_WAL_FSYNC) {
S
TD-1846  
Shengliang Guan 已提交
198 199
    return false;
  }
S
Shengliang Guan 已提交
200

L
Liu Jicong 已提交
201
  if (atomic_load_32(&tsWal.seq) % pWal->fsyncSeq == 0) {
S
TD-1846  
Shengliang Guan 已提交
202 203
    return true;
  }
S
Shengliang Guan 已提交
204

S
TD-1846  
Shengliang Guan 已提交
205 206
  return false;
}
S
Shengliang Guan 已提交
207 208

static void walUpdateSeq() {
S
TD-1846  
Shengliang Guan 已提交
209
  taosMsleep(WAL_REFRESH_MS);
L
Liu Jicong 已提交
210
  atomic_add_fetch_32(&tsWal.seq, 1);
S
Shengliang Guan 已提交
211 212 213
}

static void walFsyncAll() {
L
Liu Jicong 已提交
214
  SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
S
TD-1846  
Shengliang Guan 已提交
215 216
  while (pWal) {
    if (walNeedFsync(pWal)) {
L
Liu Jicong 已提交
217
      wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
L
Liu Jicong 已提交
218
      int32_t code = tfFsync(pWal->writeLogTfd);
S
TD-1846  
Shengliang Guan 已提交
219
      if (code != 0) {
L
Liu Jicong 已提交
220
        wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(code));
S
TD-1846  
Shengliang Guan 已提交
221 222
      }
    }
L
Liu Jicong 已提交
223
    pWal = taosIterateRef(tsWal.refSetId, pWal->refId);
S
TD-1846  
Shengliang Guan 已提交
224
  }
S
Shengliang Guan 已提交
225 226 227
}

static void *walThreadFunc(void *param) {
H
Haojun Liao 已提交
228
  setThreadName("wal");
S
Shengliang Guan 已提交
229 230 231
  while (1) {
    walUpdateSeq();
    walFsyncAll();
J
Jun Li 已提交
232

L
Liu Jicong 已提交
233
    if (atomic_load_8(&tsWal.stop)) break;
S
Shengliang Guan 已提交
234 235 236 237 238 239 240 241 242 243 244
  }

  return NULL;
}

static int32_t walCreateThread() {
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
S
TD-1846  
Shengliang Guan 已提交
245
    wError("failed to create wal thread since %s", strerror(errno));
S
Shengliang Guan 已提交
246 247 248 249
    return TAOS_SYSTEM_ERROR(errno);
  }

  pthread_attr_destroy(&thAttr);
S
TD-1207  
Shengliang Guan 已提交
250
  wDebug("wal thread is launched, thread:0x%08" PRIx64, taosGetPthreadId(tsWal.thread));
S
Shengliang Guan 已提交
251

L
Liu Jicong 已提交
252
  return 0;
S
Shengliang Guan 已提交
253 254 255
}

static void walStopThread() {
L
Liu Jicong 已提交
256
  atomic_store_8(&tsWal.stop, 1);
J
Jun Li 已提交
257

L
Liu Jicong 已提交
258
  if (taosCheckPthreadValid(tsWal.thread)) {
S
Shengliang Guan 已提交
259 260 261 262 263
    pthread_join(tsWal.thread, NULL);
  }

  wDebug("wal thread is stopped");
}