提交 6b02bd26 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

add API walGetWalFile for synchronization

上级 87bf983d
...@@ -24,12 +24,12 @@ extern "C" { ...@@ -24,12 +24,12 @@ extern "C" {
#define TAOS_WAL_FSYNC 2 #define TAOS_WAL_FSYNC 2
typedef struct { typedef struct {
uint32_t signature;
uint32_t cksum;
int8_t msgType; int8_t msgType;
int8_t reserved[3]; int8_t reserved[3];
int32_t len; int32_t len;
uint64_t version; uint64_t version;
uint32_t signature;
uint32_t cksum;
char cont[]; char cont[];
} SWalHead; } SWalHead;
...@@ -41,6 +41,7 @@ int walRenew(twal_h); ...@@ -41,6 +41,7 @@ int walRenew(twal_h);
int walWrite(twal_h, SWalHead *); int walWrite(twal_h, SWalHead *);
void walFsync(twal_h); void walFsync(twal_h);
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead));
int walGetWalFile(twal_h, char *name, int32_t *index);
extern int wDebugFlag; extern int wDebugFlag;
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "os.h" #include "os.h"
#include "tlog.h" #include "tlog.h"
#include "tchecksum.h"
#include "tutil.h" #include "tutil.h"
#include "twal.h" #include "twal.h"
...@@ -36,10 +37,11 @@ typedef struct { ...@@ -36,10 +37,11 @@ typedef struct {
int fd; int fd;
int level; int level;
int max; // maximum number of wal files int max; // maximum number of wal files
uint32_t id; // increase continuously uint64_t id; // increase continuously
int num; // number of wal files int num; // number of wal files
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
char name[TSDB_FILENAME_LEN]; char name[TSDB_FILENAME_LEN];
pthread_mutex_t mutex;
} SWal; } SWal;
int wDebugFlag = 135; int wDebugFlag = 135;
...@@ -59,6 +61,7 @@ void *walOpen(char *path, int max, int level) { ...@@ -59,6 +61,7 @@ void *walOpen(char *path, int max, int level) {
pWal->num = 0; pWal->num = 0;
pWal->level = level; pWal->level = level;
strcpy(pWal->path, path); strcpy(pWal->path, path);
pthread_mutex_init(&pWal->mutex, NULL);
if (access(path, F_OK) != 0) mkdir(path, 0755); if (access(path, F_OK) != 0) mkdir(path, 0755);
...@@ -67,9 +70,10 @@ void *walOpen(char *path, int max, int level) { ...@@ -67,9 +70,10 @@ void *walOpen(char *path, int max, int level) {
if (pWal->fd <0) { if (pWal->fd <0) {
wError("wal:%s, failed to open", path); wError("wal:%s, failed to open", path);
pthread_mutex_destroy(&pWal->mutex);
free(pWal); free(pWal);
pWal = NULL; pWal = NULL;
} }
return pWal; return pWal;
} }
...@@ -82,48 +86,59 @@ void walClose(void *handle) { ...@@ -82,48 +86,59 @@ void walClose(void *handle) {
// remove all files in the directory // remove all files in the directory
for (int i=0; i<pWal->num; ++i) { for (int i=0; i<pWal->num; ++i) {
sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id-i); sprintf(pWal->name, "%s/%s%ld", pWal->path, walPrefix, pWal->id-i);
if (remove(pWal->name) <0) { if (remove(pWal->name) <0) {
wError("wal:%s, failed to remove", pWal->name); wError("wal:%s, failed to remove", pWal->name);
} else { } else {
wTrace("wal:%s, it is removed", pWal->name); wTrace("wal:%s, it is removed", pWal->name);
} }
} }
pthread_mutex_destroy(&pWal->mutex);
free(pWal);
} }
int walRenew(twal_h handle) { int walRenew(twal_h handle) {
SWal *pWal = (SWal *)handle; SWal *pWal = (SWal *)handle;
int code = 0;
pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >=0) { if (pWal->fd >=0) {
close(pWal->fd); close(pWal->fd);
pWal->id++; pWal->id++;
wTrace("wal:%s, it is closed", pWal->name); wTrace("wal:%s, it is closed", pWal->name);
} }
sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id); pWal->num++;
sprintf(pWal->name, "%s/%s%ld", pWal->path, walPrefix, pWal->id);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (pWal->fd < 0) {
wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno)); wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno));
return -1; code = -1;
} } else {
wTrace("wal:%s, it is created", pWal->name);
wTrace("wal:%s, it is open", pWal->name); if (pWal->num > pWal->max) {
// remove the oldest wal file
char name[TSDB_FILENAME_LEN];
sprintf(name, "%s/%s%ld", pWal->path, walPrefix, pWal->id - pWal->max);
if (remove(name) <0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
} else {
wTrace("wal:%s, it is removed", name);
}
pWal->num++; pWal->num--;
if (pWal->num > pWal->max) {
// remove the oldest wal file
char name[TSDB_FILENAME_LEN];
sprintf(name, "%s/%s%010d", pWal->path, walPrefix, pWal->id - pWal->max);
if (remove(name) <0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
} else {
wTrace("wal:%s, it is removed", name);
} }
pWal->num--;
} }
return 0; pthread_mutex_unlock(&pWal->mutex);
return code;
} }
int walWrite(void *handle, SWalHead *pHead) { int walWrite(void *handle, SWalHead *pHead) {
...@@ -134,6 +149,7 @@ int walWrite(void *handle, SWalHead *pHead) { ...@@ -134,6 +149,7 @@ int walWrite(void *handle, SWalHead *pHead) {
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0;
pHead->signature = walSignature; pHead->signature = walSignature;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal));
int contLen = pHead->len + sizeof(SWalHead); int contLen = pHead->len + sizeof(SWalHead);
if(write(pWal->fd, pHead, contLen) != contLen) { if(write(pWal->fd, pHead, contLen) != contLen) {
...@@ -157,7 +173,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { ...@@ -157,7 +173,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) {
int code = 0; int code = 0;
struct dirent *ent; struct dirent *ent;
int count = 0; int count = 0;
uint32_t maxId = 0, minId = -1, index =0; uint64_t maxId = 0, minId = -1, index =0;
int plen = strlen(walPrefix); int plen = strlen(walPrefix);
char opath[TSDB_FILENAME_LEN]; char opath[TSDB_FILENAME_LEN];
...@@ -169,7 +185,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { ...@@ -169,7 +185,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) {
DIR *dir = opendir(opath); DIR *dir = opendir(opath);
while ((ent = readdir(dir))!= NULL) { while ((ent = readdir(dir))!= NULL) {
if ( strncmp(ent->d_name, walPrefix, plen) == 0) { if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
index = atol(ent->d_name + plen); index = atoll(ent->d_name + plen);
if (index > maxId) maxId = index; if (index > maxId) maxId = index;
if (index < minId) minId = index; if (index < minId) minId = index;
count++; count++;
...@@ -183,7 +199,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { ...@@ -183,7 +199,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) {
wTrace("wal:%s, %d files will be restored", opath, count); wTrace("wal:%s, %d files will be restored", opath, count);
for (index = minId; index<=maxId; ++index) { for (index = minId; index<=maxId; ++index) {
sprintf(pWal->name, "%s/old/%s%010d", pWal->path, walPrefix, index); sprintf(pWal->name, "%s/old/%s%ld", pWal->path, walPrefix, index);
code = walRestoreWalFile(pWal->name, pVnode, writeFp); code = walRestoreWalFile(pWal->name, pVnode, writeFp);
if (code < 0) break; if (code < 0) break;
} }
...@@ -204,9 +220,34 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { ...@@ -204,9 +220,34 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) {
return code; return code;
} }
int walGetWalFile(void *handle, char *name, int32_t *index) {
SWal *pWal = (SWal *)handle;
int code = 1;
int32_t first = 0;
name[0] = 0;
if (pWal == NULL || pWal->num == 0) return 0;
pthread_mutex_lock(&(pWal->mutex));
first = pWal->id + 1 - pWal->num;
if (*index == 0) *index = first; // set to first one
if (*index < first && *index > pWal->id) {
code = -1; // index out of range
} else {
sprintf(name, "%s/%s%ld", pWal->path, walPrefix, *index);
code = (*index == pWal->id) ? 0:1;
}
pthread_mutex_unlock(&(pWal->mutex));
return code;
}
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) { static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) {
SWalHead walHead; SWalHead walHead;
int code = -1; int code = 0;
int fd = open(name, O_RDONLY); int fd = open(name, O_RDONLY);
if (fd < 0) { if (fd < 0) {
...@@ -221,21 +262,21 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, vo ...@@ -221,21 +262,21 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, vo
if ( ret == 0) { code = 0; break;} if ( ret == 0) { code = 0; break;}
if (ret != sizeof(walHead)) { if (ret != sizeof(walHead)) {
wError("wal:%s, failed to read(%s)", name, strerror(errno)); wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
break; break;
} }
if (walHead.signature != walSignature) { if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) {
wError("wal:%s, file is messed up, signature:", name, walHead.signature); wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
break; break;
} }
char *buffer = malloc(sizeof(SWalHead) + walHead.len); char *buffer = malloc(sizeof(SWalHead) + walHead.len);
memcpy(buffer, &walHead, sizeof(walHead)); memcpy(buffer, &walHead, sizeof(walHead));
ret = read(fd, buffer+sizeof(walHead), walHead.len); ret = read(fd, buffer+sizeof(walHead), walHead.len);
if ( ret != walHead.len) { if ( ret != walHead.len) {
wError("wal:%s, failed to read(%s)", name, strerror(errno)); wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret);
break; break;
} }
......
...@@ -96,6 +96,7 @@ int main(int argc, char *argv[]) { ...@@ -96,6 +96,7 @@ int main(int argc, char *argv[]) {
for (int i=0; i<total; ++i) { for (int i=0; i<total; ++i) {
for (int k=0; k<rows; ++k) { for (int k=0; k<rows; ++k) {
pHead->version = ++ver; pHead->version = ++ver;
pHead->len = size;
walWrite(pWal, pHead); walWrite(pWal, pHead);
} }
...@@ -104,6 +105,23 @@ int main(int argc, char *argv[]) { ...@@ -104,6 +105,23 @@ int main(int argc, char *argv[]) {
} }
printf("%d wal files are written\n", total); printf("%d wal files are written\n", total);
int32_t index = 0;
char name[256];
while (1) {
int code = walGetWalFile(pWal, name, &index);
if (code == -1) {
printf("failed to get wal file, index:%d\n", index);
break;
}
printf("index:%d wal:%s\n", index, name);
if (code == 0) break;
index++;
}
getchar(); getchar();
walClose(pWal); walClose(pWal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册