walMgmt.c 5.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
S
Shengliang Guan 已提交
19
#include "tref.h"
S
TD-1895  
Shengliang Guan 已提交
20
#include "tfile.h"
21
#include "walInt.h"
S
Shengliang Guan 已提交
22 23 24 25 26 27 28 29 30

typedef struct {
  int32_t   refId;
  int32_t   seq;
  int8_t    stop;
  pthread_t thread;
  pthread_mutex_t mutex;
} SWalMgmt;

S
TD-1848  
Shengliang Guan 已提交
31
static SWalMgmt tsWal = {0};
S
Shengliang Guan 已提交
32 33 34 35 36 37
static int32_t  walCreateThread();
static void     walStopThread();
static int32_t  walInitObj(SWal *pWal);
static void     walFreeObj(void *pWal);

int32_t walInit() {
J
Jun Li 已提交
38
  int32_t code = 0;
S
Shengliang Guan 已提交
39 40
  tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);

J
Jun Li 已提交
41 42 43 44 45 46 47
  code = pthread_mutex_init(&tsWal.mutex, NULL);
  if (code) {
    wError("failed to init wal mutex since %s", tstrerror(code));
    return code;
  }

  code = walCreateThread();
S
Shengliang Guan 已提交
48
  if (code != TSDB_CODE_SUCCESS) {
S
TD-1846  
Shengliang Guan 已提交
49
    wError("failed to init wal module since %s", tstrerror(code));
S
Shengliang Guan 已提交
50 51 52
    return code;
  }

S
TD-1207  
Shengliang Guan 已提交
53
  wInfo("wal module is initialized, rsetId:%d", tsWal.refId);
S
Shengliang Guan 已提交
54 55 56 57 58 59
  return code;
}

void walCleanUp() {
  walStopThread();
  taosCloseRef(tsWal.refId);
J
Jun Li 已提交
60
  pthread_mutex_destroy(&tsWal.mutex);
S
Shengliang Guan 已提交
61 62 63
  wInfo("wal module is cleaned up");
}

L
Liu Jicong 已提交
64 65
SWal *walOpen(char *path, SWalCfg *pCfg) {
  SWal *pWal = malloc(sizeof(SWal));
S
Shengliang Guan 已提交
66 67 68 69 70
  if (pWal == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }

S
TD-1846  
Shengliang Guan 已提交
71
  pWal->vgId = pCfg->vgId;
S
TD-1895  
Shengliang Guan 已提交
72
  pWal->tfd = -1;
S
TD-1846  
Shengliang Guan 已提交
73
  pWal->fileId = -1;
S
Shengliang Guan 已提交
74
  pWal->level = pCfg->walLevel;
L
Liu Jicong 已提交
75
  /*pWal->keep = pCfg->keep;*/
S
Shengliang Guan 已提交
76
  pWal->fsyncPeriod = pCfg->fsyncPeriod;
S
TD-1846  
Shengliang Guan 已提交
77
  tstrncpy(pWal->path, path, sizeof(pWal->path));
S
Shengliang Guan 已提交
78 79
  pthread_mutex_init(&pWal->mutex, NULL);

S
TD-1847  
Shengliang Guan 已提交
80
  pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
S
Shengliang Guan 已提交
81 82 83 84 85 86 87
  if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;

  if (walInitObj(pWal) != TSDB_CODE_SUCCESS) {
    walFreeObj(pWal);
    return NULL;
  }

L
Liu Jicong 已提交
88 89
   pWal->rId = taosAddRef(tsWal.refId, pWal);
   if (pWal->rId < 0) {
S
Shengliang Guan 已提交
90 91 92 93
    walFreeObj(pWal);
    return NULL;
  }

S
TD-1846  
Shengliang Guan 已提交
94
  wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
S
Shengliang Guan 已提交
95 96 97 98

  return pWal;
}

L
Liu Jicong 已提交
99 100
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
  if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR;
S
Shengliang Guan 已提交
101 102 103 104 105 106 107 108 109 110 111 112

  if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
    wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level,
           pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod);
    return TSDB_CODE_SUCCESS;
  }

  wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level,
        pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod);

  pWal->level = pCfg->walLevel;
  pWal->fsyncPeriod = pCfg->fsyncPeriod;
113
  pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
S
TD-1846  
Shengliang Guan 已提交
114
  if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
S
Shengliang Guan 已提交
115 116 117 118

  return TSDB_CODE_SUCCESS;
}

S
TD-1894  
Shengliang Guan 已提交
119 120 121 122 123 124 125 126 127 128
void walStop(void *handle) {
  if (handle == NULL) return;
  SWal *pWal = handle;

  pthread_mutex_lock(&pWal->mutex);
  pWal->stop = 1;
  pthread_mutex_unlock(&pWal->mutex);
  wDebug("vgId:%d, stop write wal", pWal->vgId);
}

L
Liu Jicong 已提交
129 130
void walClose(SWal *pWal) {
  if (pWal == NULL) return;
S
TD-1846  
Shengliang Guan 已提交
131

132
  pthread_mutex_lock(&pWal->mutex);
S
TD-1895  
Shengliang Guan 已提交
133
  tfClose(pWal->tfd);
134
  pthread_mutex_unlock(&pWal->mutex);
L
Liu Jicong 已提交
135
  taosRemoveRef(tsWal.refId, pWal->rId);
S
Shengliang Guan 已提交
136 137 138
}

static int32_t walInitObj(SWal *pWal) {
139
  if (!taosMkDir(pWal->path)) {
S
Shengliang Guan 已提交
140
    wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
S
TD-1846  
Shengliang Guan 已提交
141
    return TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
142 143
  }

S
TD-1887  
Shengliang Guan 已提交
144
  wDebug("vgId:%d, object is initialized", pWal->vgId);
S
Shengliang Guan 已提交
145 146 147 148
  return TSDB_CODE_SUCCESS;
}

static void walFreeObj(void *wal) {
S
TD-1846  
Shengliang Guan 已提交
149 150
  SWal *pWal = wal;
  wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
S
Shengliang Guan 已提交
151

S
TD-1895  
Shengliang Guan 已提交
152
  tfClose(pWal->tfd);
S
Shengliang Guan 已提交
153 154 155 156
  pthread_mutex_destroy(&pWal->mutex);
  tfree(pWal);
}

S
TD-1846  
Shengliang Guan 已提交
157 158 159 160
static bool walNeedFsync(SWal *pWal) {
  if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) {
    return false;
  }
S
Shengliang Guan 已提交
161

S
TD-1846  
Shengliang Guan 已提交
162 163 164
  if (tsWal.seq % pWal->fsyncSeq == 0) {
    return true;
  }
S
Shengliang Guan 已提交
165

S
TD-1846  
Shengliang Guan 已提交
166 167
  return false;
}
S
Shengliang Guan 已提交
168 169

static void walUpdateSeq() {
S
TD-1846  
Shengliang Guan 已提交
170
  taosMsleep(WAL_REFRESH_MS);
S
Shengliang Guan 已提交
171 172 173 174 175 176
  if (++tsWal.seq <= 0) {
    tsWal.seq = 1;
  }
}

static void walFsyncAll() {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177
  SWal *pWal = taosIterateRef(tsWal.refId, 0);
S
TD-1846  
Shengliang Guan 已提交
178 179 180
  while (pWal) {
    if (walNeedFsync(pWal)) {
      wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
S
TD-1895  
Shengliang Guan 已提交
181
      int32_t code = tfFsync(pWal->tfd);
S
TD-1846  
Shengliang Guan 已提交
182
      if (code != 0) {
S
TD-1846  
Shengliang Guan 已提交
183
        wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
S
TD-1846  
Shengliang Guan 已提交
184 185
      }
    }
L
Liu Jicong 已提交
186
    pWal = taosIterateRef(tsWal.refId, pWal->rId);
S
TD-1846  
Shengliang Guan 已提交
187
  }
S
Shengliang Guan 已提交
188 189 190
}

static void *walThreadFunc(void *param) {
J
Jun Li 已提交
191
  int stop = 0;
H
Haojun Liao 已提交
192
  setThreadName("wal");
S
Shengliang Guan 已提交
193 194 195
  while (1) {
    walUpdateSeq();
    walFsyncAll();
J
Jun Li 已提交
196 197 198 199 200

    pthread_mutex_lock(&tsWal.mutex);
    stop = tsWal.stop;
    pthread_mutex_unlock(&tsWal.mutex);
    if (stop) break;
S
Shengliang Guan 已提交
201 202 203 204 205 206 207 208 209 210 211
  }

  return NULL;
}

static int32_t walCreateThread() {
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
S
TD-1846  
Shengliang Guan 已提交
212
    wError("failed to create wal thread since %s", strerror(errno));
S
Shengliang Guan 已提交
213 214 215 216
    return TAOS_SYSTEM_ERROR(errno);
  }

  pthread_attr_destroy(&thAttr);
S
TD-1207  
Shengliang Guan 已提交
217
  wDebug("wal thread is launched, thread:0x%08" PRIx64, taosGetPthreadId(tsWal.thread));
S
Shengliang Guan 已提交
218 219 220 221 222

  return TSDB_CODE_SUCCESS;
}

static void walStopThread() {
J
Jun Li 已提交
223
  pthread_mutex_lock(&tsWal.mutex);
S
TD-1846  
Shengliang Guan 已提交
224
  tsWal.stop = 1;
J
Jun Li 已提交
225 226
  pthread_mutex_unlock(&tsWal.mutex);

S
TD-1207  
Shengliang Guan 已提交
227
  if (taosCheckPthreadValid(tsWal.thread)) {
S
Shengliang Guan 已提交
228 229 230 231 232
    pthread_join(tsWal.thread, NULL);
  }

  wDebug("wal thread is stopped");
}