walMgmt.c 7.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
19
#include "tcompare.h"
L
Liu Jicong 已提交
20
#include "tref.h"
21
#include "walInt.h"
S
Shengliang Guan 已提交
22 23

typedef struct {
24 25 26 27
  int8_t   stop;
  int8_t   inited;
  uint32_t seq;
  int32_t  refSetId;
wafwerar's avatar
wafwerar 已提交
28
  TdThread thread;
S
Shengliang Guan 已提交
29 30
} SWalMgmt;

L
Liu Jicong 已提交
31
static SWalMgmt tsWal = {0, .seq = 1};
S
Shengliang Guan 已提交
32 33 34 35
static int32_t  walCreateThread();
static void     walStopThread();
static void     walFreeObj(void *pWal);

L
Liu Jicong 已提交
36
int64_t walGetSeq() { return (int64_t)atomic_load_32(&tsWal.seq); }
L
Liu Jicong 已提交
37

S
Shengliang Guan 已提交
38
int32_t walInit() {
L
Liu Jicong 已提交
39 40 41 42 43
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2);
    if (old != 2) break;
  }
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45 46
  if (old == 0) {
    tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
S
Shengliang Guan 已提交
47

L
Liu Jicong 已提交
48 49 50 51 52 53 54 55 56
    int32_t code = walCreateThread();
    if (code != 0) {
      wError("failed to init wal module since %s", tstrerror(code));
      atomic_store_8(&tsWal.inited, 0);
      return code;
    }

    wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
    atomic_store_8(&tsWal.inited, 1);
S
Shengliang Guan 已提交
57 58
  }

L
Liu Jicong 已提交
59
  return 0;
S
Shengliang Guan 已提交
60 61 62
}

void walCleanUp() {
L
Liu Jicong 已提交
63 64 65 66 67 68 69 70 71 72 73
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    walStopThread();
    taosCloseRef(tsWal.refSetId);
    wInfo("wal module is cleaned up");
    atomic_store_8(&tsWal.inited, 0);
74
  }
S
Shengliang Guan 已提交
75 76
}

L
Liu Jicong 已提交
77
SWal *walOpen(const char *path, SWalCfg *pCfg) {
78
  SWal *pWal = taosMemoryCalloc(1, sizeof(SWal));
S
Shengliang Guan 已提交
79 80 81 82
  if (pWal == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }
L
Liu Jicong 已提交
83

L
Liu Jicong 已提交
84
  // set config
L
Liu Jicong 已提交
85
  memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87
  pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
L
Liu Jicong 已提交
88 89 90 91 92 93 94 95
  if (pWal->cfg.retentionSize > 0) {
    pWal->cfg.retentionSize *= 1024;
  }

  if (pWal->cfg.segSize > 0) {
    pWal->cfg.segSize *= 1024;
  }

L
Liu Jicong 已提交
96
  if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
L
Liu Jicong 已提交
97

L
Liu Jicong 已提交
98
  tstrncpy(pWal->path, path, sizeof(pWal->path));
L
Liu Jicong 已提交
99
  if (taosMkDir(pWal->path) != 0) {
L
Liu Jicong 已提交
100
    wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
L
Liu Jicong 已提交
101
    taosMemoryFree(pWal);
L
Liu Jicong 已提交
102 103
    return NULL;
  }
L
Liu Jicong 已提交
104

105
  // init ref
106
  pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
107 108 109 110 111
  if (pWal->pRefHash == NULL) {
    taosMemoryFree(pWal);
    return NULL;
  }

L
Liu Jicong 已提交
112
  // open meta
L
Liu Jicong 已提交
113
  walResetVer(&pWal->vers);
L
Liu Jicong 已提交
114 115
  pWal->pLogFile = NULL;
  pWal->pIdxFile = NULL;
L
Liu Jicong 已提交
116
  pWal->writeCur = -1;
L
Liu Jicong 已提交
117
  pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
L
Liu Jicong 已提交
118
  if (pWal->fileInfoSet == NULL) {
L
Liu Jicong 已提交
119
    wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
120
    taosHashCleanup(pWal->pRefHash);
wafwerar's avatar
wafwerar 已提交
121
    taosMemoryFree(pWal);
L
Liu Jicong 已提交
122 123
    return NULL;
  }
L
Liu Jicong 已提交
124

L
Liu Jicong 已提交
125
  // init status
L
Liu Jicong 已提交
126
  pWal->totSize = 0;
L
Liu Jicong 已提交
127
  pWal->lastRollSeq = -1;
L
Liu Jicong 已提交
128

L
Liu Jicong 已提交
129
  // init write buffer
L
Liu Jicong 已提交
130 131
  memset(&pWal->writeHead, 0, sizeof(SWalCkHead));
  pWal->writeHead.head.protoVer = WAL_PROTO_VER;
L
Liu Jicong 已提交
132
  pWal->writeHead.magic = WAL_MAGIC;
S
Shengliang Guan 已提交
133

wafwerar's avatar
wafwerar 已提交
134
  if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
L
Liu Jicong 已提交
135
    taosArrayDestroy(pWal->fileInfoSet);
136
    taosHashCleanup(pWal->pRefHash);
wafwerar's avatar
wafwerar 已提交
137
    taosMemoryFree(pWal);
L
Liu Jicong 已提交
138 139
    return NULL;
  }
S
Shengliang Guan 已提交
140

L
Liu Jicong 已提交
141
  pWal->refId = taosAddRef(tsWal.refSetId, pWal);
L
Liu Jicong 已提交
142
  if (pWal->refId < 0) {
143
    taosHashCleanup(pWal->pRefHash);
wafwerar's avatar
wafwerar 已提交
144
    taosThreadMutexDestroy(&pWal->mutex);
L
Liu Jicong 已提交
145
    taosArrayDestroy(pWal->fileInfoSet);
wafwerar's avatar
wafwerar 已提交
146
    taosMemoryFree(pWal);
S
Shengliang Guan 已提交
147 148 149
    return NULL;
  }

L
Liu Jicong 已提交
150 151 152
  walLoadMeta(pWal);

  if (walCheckAndRepairMeta(pWal) < 0) {
L
Liu Jicong 已提交
153
    wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId);
154
    taosHashCleanup(pWal->pRefHash);
L
Liu Jicong 已提交
155
    taosRemoveRef(tsWal.refSetId, pWal->refId);
wafwerar's avatar
wafwerar 已提交
156
    taosThreadMutexDestroy(&pWal->mutex);
L
Liu Jicong 已提交
157
    taosArrayDestroy(pWal->fileInfoSet);
S
Shengliang Guan 已提交
158 159 160
    return NULL;
  }

L
Liu Jicong 已提交
161
  if (walCheckAndRepairIdx(pWal) < 0) {
L
Liu Jicong 已提交
162 163 164 165 166 167
    wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId);
    taosHashCleanup(pWal->pRefHash);
    taosRemoveRef(tsWal.refSetId, pWal->refId);
    taosThreadMutexDestroy(&pWal->mutex);
    taosArrayDestroy(pWal->fileInfoSet);
    return NULL;
L
Liu Jicong 已提交
168 169
  }

L
Liu Jicong 已提交
170 171
  wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
         pWal->cfg.fsyncPeriod);
S
Shengliang Guan 已提交
172 173 174 175

  return pWal;
}

L
Liu Jicong 已提交
176 177
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
  if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR;
S
Shengliang Guan 已提交
178

L
Liu Jicong 已提交
179 180 181
  if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod) {
    wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->cfg.vgId, pWal->cfg.level,
           pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
L
Liu Jicong 已提交
182
    return 0;
S
Shengliang Guan 已提交
183 184
  }

L
Liu Jicong 已提交
185 186
  wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->cfg.vgId, pWal->cfg.level,
        pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
S
Shengliang Guan 已提交
187

L
Liu Jicong 已提交
188 189
  pWal->cfg.level = pCfg->level;
  pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod;
190
  pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
S
TD-1846  
Shengliang Guan 已提交
191
  if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
S
Shengliang Guan 已提交
192

L
Liu Jicong 已提交
193
  return 0;
S
TD-1894  
Shengliang Guan 已提交
194 195
}

L
Liu Jicong 已提交
196
void walClose(SWal *pWal) {
wafwerar's avatar
wafwerar 已提交
197
  taosThreadMutexLock(&pWal->mutex);
L
Liu Jicong 已提交
198 199 200 201
  taosCloseFile(&pWal->pLogFile);
  pWal->pLogFile = NULL;
  taosCloseFile(&pWal->pIdxFile);
  pWal->pIdxFile = NULL;
L
Liu Jicong 已提交
202
  walSaveMeta(pWal);
L
Liu Jicong 已提交
203 204
  taosArrayDestroy(pWal->fileInfoSet);
  pWal->fileInfoSet = NULL;
205
  taosHashCleanup(pWal->pRefHash);
wafwerar's avatar
wafwerar 已提交
206
  taosThreadMutexUnlock(&pWal->mutex);
S
Shengliang Guan 已提交
207

L
Liu Jicong 已提交
208
  taosRemoveRef(tsWal.refSetId, pWal->refId);
S
Shengliang Guan 已提交
209 210 211
}

static void walFreeObj(void *wal) {
S
TD-1846  
Shengliang Guan 已提交
212
  SWal *pWal = wal;
L
Liu Jicong 已提交
213
  wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
S
Shengliang Guan 已提交
214

wafwerar's avatar
wafwerar 已提交
215
  taosThreadMutexDestroy(&pWal->mutex);
wafwerar's avatar
wafwerar 已提交
216
  taosMemoryFreeClear(pWal);
S
Shengliang Guan 已提交
217 218
}

S
TD-1846  
Shengliang Guan 已提交
219
static bool walNeedFsync(SWal *pWal) {
L
Liu Jicong 已提交
220
  if (pWal->cfg.fsyncPeriod <= 0 || pWal->cfg.level != TAOS_WAL_FSYNC) {
S
TD-1846  
Shengliang Guan 已提交
221 222
    return false;
  }
S
Shengliang Guan 已提交
223

L
Liu Jicong 已提交
224
  if (atomic_load_32(&tsWal.seq) % pWal->fsyncSeq == 0) {
S
TD-1846  
Shengliang Guan 已提交
225 226
    return true;
  }
S
Shengliang Guan 已提交
227

S
TD-1846  
Shengliang Guan 已提交
228 229
  return false;
}
S
Shengliang Guan 已提交
230 231

static void walUpdateSeq() {
S
TD-1846  
Shengliang Guan 已提交
232
  taosMsleep(WAL_REFRESH_MS);
L
Liu Jicong 已提交
233
  atomic_add_fetch_32(&tsWal.seq, 1);
S
Shengliang Guan 已提交
234 235 236
}

static void walFsyncAll() {
L
Liu Jicong 已提交
237
  SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
S
TD-1846  
Shengliang Guan 已提交
238 239
  while (pWal) {
    if (walNeedFsync(pWal)) {
L
Liu Jicong 已提交
240 241
      wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
             atomic_load_32(&tsWal.seq));
L
Liu Jicong 已提交
242
      int32_t code = taosFsyncFile(pWal->pLogFile);
S
TD-1846  
Shengliang Guan 已提交
243
      if (code != 0) {
L
Liu Jicong 已提交
244 245
        wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
               strerror(code));
S
TD-1846  
Shengliang Guan 已提交
246 247
      }
    }
L
Liu Jicong 已提交
248
    pWal = taosIterateRef(tsWal.refSetId, pWal->refId);
S
TD-1846  
Shengliang Guan 已提交
249
  }
S
Shengliang Guan 已提交
250 251 252
}

static void *walThreadFunc(void *param) {
H
Haojun Liao 已提交
253
  setThreadName("wal");
S
Shengliang Guan 已提交
254 255 256
  while (1) {
    walUpdateSeq();
    walFsyncAll();
J
Jun Li 已提交
257

L
Liu Jicong 已提交
258
    if (atomic_load_8(&tsWal.stop)) break;
S
Shengliang Guan 已提交
259 260 261 262 263 264
  }

  return NULL;
}

static int32_t walCreateThread() {
wafwerar's avatar
wafwerar 已提交
265 266 267
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
268

wafwerar's avatar
wafwerar 已提交
269
  if (taosThreadCreate(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
S
TD-1846  
Shengliang Guan 已提交
270
    wError("failed to create wal thread since %s", strerror(errno));
L
Liu Jicong 已提交
271 272
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
S
Shengliang Guan 已提交
273 274
  }

wafwerar's avatar
wafwerar 已提交
275
  taosThreadAttrDestroy(&thAttr);
S
TD-1207  
Shengliang Guan 已提交
276
  wDebug("wal thread is launched, thread:0x%08" PRIx64, taosGetPthreadId(tsWal.thread));
S
Shengliang Guan 已提交
277

L
Liu Jicong 已提交
278
  return 0;
S
Shengliang Guan 已提交
279 280 281
}

static void walStopThread() {
L
Liu Jicong 已提交
282
  atomic_store_8(&tsWal.stop, 1);
J
Jun Li 已提交
283

L
Liu Jicong 已提交
284
  if (taosCheckPthreadValid(tsWal.thread)) {
wafwerar's avatar
wafwerar 已提交
285
    taosThreadJoin(tsWal.thread, NULL);
286
    taosThreadClear(&tsWal.thread);
S
Shengliang Guan 已提交
287 288 289 290
  }

  wDebug("wal thread is stopped");
}