walMain.c 13.3 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <dirent.h>
#include <unistd.h>
#include <fcntl.h> 

#include "os.h"
#include "tlog.h"
26
#include "tchecksum.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
28
#include "ttimer.h"
29
#include "taoserror.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30
#include "twal.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31
#include "tqueue.h"
32
#include "tfile.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
33 34

#define walPrefix "wal"
S
Shengliang Guan 已提交
35

S
Shengliang Guan 已提交
36 37 38 39
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }}
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }}
#define wWarn(...)  { if (wDebugFlag & DEBUG_WARN)  { taosPrintLog("WAL WARN  ", 255, __VA_ARGS__); }}
#define wInfo(...)  { if (wDebugFlag & DEBUG_INFO)  { taosPrintLog("WAL INFO  ", 255, __VA_ARGS__); }}
S
Shengliang Guan 已提交
40 41
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL DEBUG ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL TRACE ", wDebugFlag, __VA_ARGS__); }}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42 43

typedef struct {
J
Jeff Tao 已提交
44
  uint64_t version;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45
  int      fd;
J
Jeff Tao 已提交
46
  int      keep;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47
  int      level;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48 49 50
  int32_t  fsyncPeriod;
  void    *timer;
  void    *signature;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
51
  int      max;  // maximum number of wal files
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52
  uint32_t id;   // increase continuously
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
53 54
  int      num;  // number of wal files
  char     path[TSDB_FILENAME_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55
  char     name[TSDB_FILENAME_LEN+16];
56
  pthread_mutex_t mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57 58
} SWal;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59 60 61
static void    *walTmrCtrl = NULL;
static int     tsWalNum = 0;
static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62
static uint32_t walSignature = 0xFAFBFDFE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63 64 65 66 67 68 69 70
static int  walHandleExistingFiles(const char *path);
static int  walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
static int  walRemoveWalFiles(const char *path);
static void walProcessFsyncTimer(void *param, void *tmrId);
static void walRelease(SWal *pWal);

static void walModuleInitFunc() {
  walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71 72 73 74
  if (walTmrCtrl == NULL) 
    walModuleInit = PTHREAD_ONCE_INIT;
  else
    wDebug("WAL module is initialized");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76

J
Jeff Tao 已提交
77
void *walOpen(const char *path, const SWalCfg *pCfg) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78
  SWal *pWal = calloc(sizeof(SWal), 1);
79 80 81 82
  if (pWal == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
84 85 86 87 88 89 90 91
  pthread_once(&walModuleInit, walModuleInitFunc);
  if (walTmrCtrl == NULL) {
    free(pWal);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }

  atomic_add_fetch_32(&tsWalNum, 1);    
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92
  pWal->fd = -1;
J
Jeff Tao 已提交
93
  pWal->max = pCfg->wals;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94 95
  pWal->id = 0;
  pWal->num = 0;
H
hjxilinx 已提交
96
  pWal->level = pCfg->walLevel;
J
Jeff Tao 已提交
97
  pWal->keep = pCfg->keep;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98 99
  pWal->fsyncPeriod = pCfg->fsyncPeriod;
  pWal->signature = pWal;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100
  tstrncpy(pWal->path, path, sizeof(pWal->path));
101
  pthread_mutex_init(&pWal->mutex, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103 104 105 106 107 108 109 110 111
  if (pWal->fsyncPeriod > 0  && pWal->level == TAOS_WAL_FSYNC) {
    pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
    if (pWal->timer == NULL) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      walRelease(pWal);
      return NULL;
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112 113 114
  if (tmkdir(path, 0755) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    wError("wal:%s, failed to create directory(%s)", path, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115
    walRelease(pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116
    pWal = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117 118
  }
     
J
Jeff Tao 已提交
119 120
  if (pCfg->keep == 1) return pWal;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
121 122 123
  if (walHandleExistingFiles(path) == 0) 
    walRenew(pWal);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124
  if (pWal && pWal->fd <0) {
125
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126
    wError("wal:%s, failed to open(%s)", path, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127
    walRelease(pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128
    pWal = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129
  } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131
  if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132 133 134 135
  return pWal;
}

void walClose(void *handle) {
S
slguan 已提交
136 137
  if (handle == NULL) return;
  
J
Jeff Tao 已提交
138
  SWal *pWal = handle;  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139 140
  tclose(pWal->fd);
  if (pWal->timer) taosTmrStopA(&pWal->timer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141

J
Jeff Tao 已提交
142 143 144
  if (pWal->keep == 0) {
    // remove all files in the directory
    for (int i=0; i<pWal->num; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145
      snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id-i);
J
Jeff Tao 已提交
146 147 148
      if (remove(pWal->name) <0) {
        wError("wal:%s, failed to remove", pWal->name);
      } else {
S
Shengliang Guan 已提交
149
        wDebug("wal:%s, it is removed", pWal->name);
J
Jeff Tao 已提交
150
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151
    }
J
Jeff Tao 已提交
152
  } else {
S
Shengliang Guan 已提交
153
    wDebug("wal:%s, it is closed and kept", pWal->name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154
  }
155

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156
  walRelease(pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157 158
}

J
Jeff Tao 已提交
159
int walRenew(void *handle) {
160
  if (handle == NULL) return 0;
J
Jeff Tao 已提交
161
  SWal *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162 163

  terrno = 0;
164

165 166
  pthread_mutex_lock(&pWal->mutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167 168 169
  if (pWal->fd >=0) {
    close(pWal->fd);
    pWal->id++;
S
Shengliang Guan 已提交
170
    wDebug("wal:%s, it is closed", pWal->name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171 172
  }

173 174
  pWal->num++;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175
  snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176
  pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
177

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178
  if (pWal->fd < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179 180
    wError("wal:%s, failed to open(%s)", pWal->name, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
181
  } else {
S
Shengliang Guan 已提交
182
    wDebug("wal:%s, it is created", pWal->name);
183 184 185

    if (pWal->num > pWal->max) {
      // remove the oldest wal file
sangshuduo's avatar
sangshuduo 已提交
186
      char name[TSDB_FILENAME_LEN * 3];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187
      snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
188 189 190
      if (remove(name) <0) {
        wError("wal:%s, failed to remove(%s)", name, strerror(errno));
      } else {
S
Shengliang Guan 已提交
191
        wDebug("wal:%s, it is removed", name);
192
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193

194
      pWal->num--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195 196 197
    }
  }  
  
198 199
  pthread_mutex_unlock(&pWal->mutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201 202 203
}

int walWrite(void *handle, SWalHead *pHead) {
J
Jeff Tao 已提交
204
  SWal *pWal = handle;
205
  if (pWal == NULL) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
207 208
  terrno = 0;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209 210
  // no wal  
  if (pWal->level == TAOS_WAL_NOLOG) return 0;
J
Jeff Tao 已提交
211
  if (pHead->version <= pWal->version) return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212 213

  pHead->signature = walSignature;
J
Jeff Tao 已提交
214
  taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215 216
  int contLen = pHead->len + sizeof(SWalHead);

217
  if(twrite(pWal->fd, pHead, contLen) != contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218
    wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219
    terrno = TAOS_SYSTEM_ERROR(errno);
J
Jeff Tao 已提交
220 221 222
  } else {
    pWal->version = pHead->version;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
224
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226 227 228
}

void walFsync(void *handle) {

J
Jeff Tao 已提交
229
  SWal *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230
  if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232
  if (pWal->fsyncPeriod == 0) {
233 234 235 236
    if (fsync(pWal->fd) < 0) {
      wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237 238
}

J
Jeff Tao 已提交
239
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
J
Jeff Tao 已提交
240
  SWal    *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241 242
  struct   dirent *ent;
  int      count = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243
  uint32_t maxId = 0, minId = -1, index =0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245
  terrno = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246
  int   plen = strlen(walPrefix);
sangshuduo's avatar
sangshuduo 已提交
247
  char  opath[TSDB_FILENAME_LEN+5];
J
Jeff Tao 已提交
248
   
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249
  int slen = snprintf(opath, sizeof(opath), "%s", pWal->path);
J
Jeff Tao 已提交
250 251
  if ( pWal->keep == 0) 
    strcpy(opath+slen, "/old");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252 253

  DIR *dir = opendir(opath);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
254 255 256 257 258 259
  if (dir == NULL && errno == ENOENT) return 0;
  if (dir == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return terrno;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260 261
  while ((ent = readdir(dir))!= NULL) {
    if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262
      index = atol(ent->d_name + plen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263 264 265 266 267 268
      if (index > maxId) maxId = index;
      if (index < minId) minId = index;
      count++;
    }
  }

269 270
  closedir(dir);

271
  if (count == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272 273
    if (pWal->keep) terrno = walRenew(pWal);
    return terrno;
274
  }
J
Jeff Tao 已提交
275

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276
  if ( count != (maxId-minId+1) ) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277
    wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
S
Shengliang Guan 已提交
278
    terrno = TSDB_CODE_WAL_APP_ERROR;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279
  } else {
S
Shengliang Guan 已提交
280
    wDebug("wal:%s, %d files will be restored", opath, count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281 282

    for (index = minId; index<=maxId; ++index) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283
      snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284 285
      terrno = walRestoreWalFile(pWal, pVnode, writeFp);
      if (terrno < 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
286 287 288
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289
  if (terrno == 0) {
J
Jeff Tao 已提交
290
    if (pWal->keep == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
291 292
      terrno = walRemoveWalFiles(opath);
      if (terrno == 0) {
J
Jeff Tao 已提交
293 294
        if (remove(opath) < 0) {
          wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
295
          terrno = TAOS_SYSTEM_ERROR(errno);
J
Jeff Tao 已提交
296 297 298 299 300 301
        }
      }
    } else { 
      // open the existing WAL file in append mode
      pWal->num = count;
      pWal->id = maxId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
302
      snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, maxId);
J
Jeff Tao 已提交
303 304 305
      pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
      if (pWal->fd < 0) {
        wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
306
        terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
307 308 309 310
      }
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
311
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312 313
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
314
int walGetWalFile(void *handle, char *name, uint32_t *index) {
J
Jeff Tao 已提交
315
  SWal   *pWal = handle;
316 317 318 319 320 321 322 323 324 325 326 327 328 329
  int     code = 1;
  int32_t first = 0; 

  name[0] = 0;
  if (pWal == NULL || pWal->num == 0) return 0;

  pthread_mutex_lock(&(pWal->mutex));

  first = pWal->id + 1 - pWal->num;
  if (*index == 0) *index = first;  // set to first one

  if (*index < first && *index > pWal->id) {
    code = -1;  // index out of range
  } else { 
330
    sprintf(name, "wal/%s%d", walPrefix, *index);
331 332 333 334 335 336 337 338
    code = (*index == pWal->id) ? 0:1;
  }

  pthread_mutex_unlock(&(pWal->mutex));

  return code;
}  

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
339 340 341 342 343 344 345 346 347 348
static void walRelease(SWal *pWal) {

  pthread_mutex_destroy(&pWal->mutex);
  pWal->signature = NULL;
  free(pWal);

  if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) {
    if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl);
    walTmrCtrl = NULL;
    walModuleInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
349
    wDebug("WAL module is cleaned up");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
350 351 352
  }
}

J
Jeff Tao 已提交
353 354
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
  char *name = pWal->name;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
356
  terrno = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
357
  char *buffer = malloc(1024000);  // size for one record
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358 359 360 361
  if (buffer == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);   
    return terrno;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
362 363

  SWalHead *pHead = (SWalHead *)buffer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364 365 366 367

  int fd = open(name, O_RDONLY);
  if (fd < 0) {
    wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
369
    free(buffer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
370
    return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371 372
  }

S
Shengliang Guan 已提交
373
  wDebug("wal:%s, start to restore", name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
374 375

  while (1) {
376
    int ret = tread(fd, pHead, sizeof(SWalHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
377
    if ( ret == 0)  break;  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
378

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
379
    if (ret != sizeof(SWalHead)) {
380
      wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
381
      terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
382 383 384
      break;
    }

J
Jeff Tao 已提交
385
    if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
386
      wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
387
      terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
388
      break;
389
    } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
390

391
    ret = tread(fd, pHead->cont, pHead->len);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
392 393
    if ( ret != pHead->len) {
      wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
394
      terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395 396 397
      break;
    }

J
Jeff Tao 已提交
398
    if (pWal->keep) pWal->version = pHead->version;
J
Jeff Tao 已提交
399
    (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
400 401
  }

J
Jeff Tao 已提交
402
  close(fd);
J
Jeff Tao 已提交
403 404
  free(buffer);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
406 407
}

J
Jeff Tao 已提交
408
int walHandleExistingFiles(const char *path) {
sangshuduo's avatar
sangshuduo 已提交
409 410
  char   oname[TSDB_FILENAME_LEN * 3];
  char   nname[TSDB_FILENAME_LEN * 3];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
411 412
  char   opath[TSDB_FILENAME_LEN];

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
413
  snprintf(opath, sizeof(opath), "%s/old", path);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
414 415 416 417

  struct dirent *ent;
  DIR   *dir = opendir(path);
  int    plen = strlen(walPrefix);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
418
  terrno = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
419 420 421 422 423 424 425 426 427 428

  if (access(opath, F_OK) == 0) {
    // old directory is there, it means restore process is not finished
    walRemoveWalFiles(path);

  } else {
    // move all files to old directory
    int count = 0;
    while ((ent = readdir(dir))!= NULL) {  
      if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
429 430
        snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name);
        snprintf(nname, sizeof(nname), "%s/old/%s", path, ent->d_name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
431 432 433 434
        if (tmkdir(opath, 0755) != 0) {
          wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
          terrno = TAOS_SYSTEM_ERROR(errno);
          break; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
435 436
        }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
437 438
        if (rename(oname, nname) < 0) {
          wError("wal:%s, failed to move to new:%s", oname, nname);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
439
          terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
440 441 442 443 444 445 446
          break;
        } 

        count++;
      }
    }

S
Shengliang Guan 已提交
447
    wDebug("wal:%s, %d files are moved for restoration", path, count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
448 449 450
  }
  
  closedir(dir);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
451
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
452 453
}

J
Jeff Tao 已提交
454
static int walRemoveWalFiles(const char *path) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
455
  int    plen = strlen(walPrefix);
sangshuduo's avatar
sangshuduo 已提交
456
  char   name[TSDB_FILENAME_LEN * 3];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
457 458
 
  terrno = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
459 460 461

  struct dirent *ent;
  DIR   *dir = opendir(path);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
462 463 464 465 466
  if (dir == NULL && errno == ENOENT) return 0;
  if (dir == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return terrno;
  }  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
467 468 469

  while ((ent = readdir(dir))!= NULL) {
    if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
470
      snprintf(name, sizeof(name), "%s/%s", path, ent->d_name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
471 472
      if (remove(name) <0) {
        wError("wal:%s, failed to remove(%s)", name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
473
        terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
474 475 476 477 478 479
      }
    }
  } 

  closedir(dir);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
480
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
481 482
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
483 484 485 486 487 488 489 490 491 492 493 494
static void walProcessFsyncTimer(void *param, void *tmrId) {
  SWal *pWal = param;

  if (pWal->signature != pWal) return;
  if (pWal->fd < 0) return;

  if (fsync(pWal->fd) < 0) {
    wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
  }
  
  pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
}