提交 43a2bcdc 编写于 作者: L Liu Jicong

fix wal read handle

上级 e70989dd
......@@ -32,6 +32,23 @@ extern int32_t wDebugFlag;
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
#pragma pack(push,1)
typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1,
......@@ -43,6 +60,7 @@ typedef struct SWalReadHead {
uint8_t msgType;
int8_t reserved[2];
int32_t len;
//int64_t ingestTs; //not implemented
int64_t version;
char body[];
} SWalReadHead;
......@@ -71,25 +89,6 @@ typedef struct {
SWalReadHead head;
} SWalHead;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
//#define WAL_FILE_NUM 1 // 3
#define WAL_FILESET_MAX 128
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
typedef struct SWalVer {
int64_t firstVer;
int64_t verInSnapshotting;
......@@ -101,24 +100,18 @@ typedef struct SWalVer {
typedef struct SWal {
// cfg
SWalCfg cfg;
//total size
int64_t totSize;
//fsync seq
int32_t fsyncSeq;
//reference
int64_t refId;
//write tfd
int64_t writeLogTfd;
int64_t writeIdxTfd;
//wal lifecycle
SWalVer vers;
//roll status
int64_t lastRollSeq;
//file set
int32_t writeCur;
int64_t writeLogTfd;
int64_t writeIdxTfd;
SArray* fileInfoSet;
//ctl
int32_t curStatus;
int32_t fsyncSeq;
int64_t totSize;
int64_t refId;
int64_t lastRollSeq;
pthread_mutex_t mutex;
//path
char path[WAL_PATH_LEN];
......@@ -134,8 +127,9 @@ typedef struct SWalReadHandle {
int64_t curVersion;
int64_t capacity;
int64_t status; //if cursor valid
SWalHead head;
SWalHead* pHead;
} SWalReadHandle;
#pragma pack(pop)
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
......
......@@ -33,10 +33,12 @@ typedef struct WalFileInfo {
int64_t fileSize;
} WalFileInfo;
#pragma pack(push,1)
typedef struct WalIdxEntry {
int64_t ver;
int64_t offset;
} WalIdxEntry;
#pragma pack(pop)
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
......@@ -78,11 +80,11 @@ static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) {
}
static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer);
return sprintf(buf, "%s/%020" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer);
}
static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
}
static inline int walValidHeadCksum(SWalHead* pHead) {
......
......@@ -255,9 +255,8 @@ static int32_t walCreateThread() {
static void walStopThread() {
atomic_store_8(&tsWal.stop, 1);
if (tsWal.thread != NULL && taosCheckPthreadValid(tsWal.thread)) {
if (taosCheckPthreadValid(tsWal.thread)) {
pthread_join(tsWal.thread, NULL);
tsWal.thread = NULL;
}
wDebug("wal thread is stopped");
......
......@@ -21,16 +21,25 @@ SWalReadHandle* walOpenReadHandle(SWal* pWal) {
if(pRead == NULL) {
return NULL;
}
memset(pRead, 0, sizeof(SWalReadHandle));
pRead->pWal = pWal;
pRead->readIdxTfd = -1;
pRead->readLogTfd = -1;
pRead->curVersion = -1;
pRead->curFileFirstVer = -1;
pRead->capacity = 0;
pRead->status = 0;
pRead->pHead = malloc(sizeof(SWalHead));
if(pRead->pHead == NULL) {
free(pRead);
return NULL;
}
return pRead;
}
void walCloseReadHandle(SWalReadHandle *pRead) {
tfClose(pRead->readIdxTfd);
tfClose(pRead->readLogTfd);
tfree(pRead->pHead);
free(pRead);
}
......@@ -47,18 +56,17 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
//seek position
int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE;
code = tfLseek(idxTfd, offset, SEEK_SET);
if(code != 0) {
if(code < 0) {
return -1;
}
WalIdxEntry entry;
code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry));
if(code != 0) {
if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
return -1;
}
//TODO:deserialize
ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code != 0) {
if (code < 0) {
return -1;
}
return code;
......@@ -71,13 +79,13 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
tfClose(pRead->readLogTfd);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
int logTfd = tfOpenRead(fnameStr);
int64_t logTfd = tfOpenRead(fnameStr);
if(logTfd < 0) {
return -1;
}
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
int idxTfd = tfOpenRead(fnameStr);
int64_t idxTfd = tfOpenRead(fnameStr);
if(idxTfd < 0) {
return -1;
}
......@@ -90,7 +98,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
int code;
SWal *pWal = pRead->pWal;
if(ver == pWal->vers.lastVer) {
if(ver == pRead->curVersion) {
return 0;
}
if(ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
......@@ -126,33 +134,41 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
//TODO: check wal life
if(pRead->curVersion != ver) {
walReadSeekVer(pRead, ver);
code = walReadSeekVer(pRead, ver);
if(code != 0) {
return -1;
}
}
if(!tfValid(pRead->readLogTfd)) return -1;
if(sizeof(SWalHead) != tfRead(pRead->readLogTfd, &pRead->head, sizeof(SWalHead))) {
code = tfRead(pRead->readLogTfd, pRead->pHead, sizeof(SWalHead));
if(code != sizeof(SWalHead)) {
return -1;
}
code = walValidHeadCksum(&pRead->head);
code = walValidHeadCksum(pRead->pHead);
if(code != 0) {
return -1;
}
if(pRead->capacity < pRead->head.head.len) {
void* ptr = realloc(pRead, pRead->head.head.len);
if(pRead->capacity < pRead->pHead->head.len) {
void* ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
if(ptr == NULL) {
return -1;
}
pRead = ptr;
pRead->capacity = pRead->head.head.len;
pRead->pHead = ptr;
pRead->capacity = pRead->pHead->head.len;
}
if(pRead->head.head.len != tfRead(pRead->readLogTfd, &pRead->head.head.body, pRead->head.head.len)) {
if(pRead->pHead->head.len != tfRead(pRead->readLogTfd, pRead->pHead->head.body, pRead->pHead->head.len)) {
return -1;
}
code = walValidBodyCksum(&pRead->head);
/*code = walValidBodyCksum(pRead->pHead);*/
ASSERT(pRead->pHead->head.version == ver);
if(code != 0) {
return -1;
}
pRead->curVersion++;
return 0;
}
......
......@@ -377,11 +377,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
//must truncate explicitly first
return -1;
}
/*if (!tfValid(pWal->curLogTfd)) return 0;*/
/*if (!tfValid(pWal->writeLogTfd)) return -1;*/
pthread_mutex_lock(&pWal->mutex);
pWal->writeHead.head.version = index;
int64_t offset = walGetCurFileOffset(pWal);
pWal->writeHead.head.len = bodyLen;
pWal->writeHead.head.msgType = msgType;
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
......@@ -393,12 +394,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
}
if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) {
if (tfWrite(pWal->writeLogTfd, (char*)body, bodyLen) != bodyLen) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
}
code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal));
code = walWriteIndex(pWal, index, offset);
if(code != 0) {
//TODO
return -1;
......
......@@ -5,6 +5,9 @@
#include "walInt.h"
const char* ranStr = "tvapq02tcp";
const int ranStrLen = strlen(ranStr);
class WalCleanEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
......@@ -157,15 +160,13 @@ TEST_F(WalCleanEnv, removeOldMeta) {
TEST_F(WalKeepEnv, readOldMeta) {
walResetEnv();
const char* ranStr = "tvapq02tcp";
int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->vers.lastVer, i);
}
......@@ -179,7 +180,7 @@ TEST_F(WalKeepEnv, readOldMeta) {
char* newss = walMetaSerialize(pWal);
len = strlen(oldss);
int len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
for(int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]);
......@@ -189,14 +190,12 @@ TEST_F(WalKeepEnv, readOldMeta) {
}
TEST_F(WalCleanEnv, write) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->vers.lastVer, i);
}
......@@ -205,11 +204,9 @@ TEST_F(WalCleanEnv, write) {
}
TEST_F(WalCleanEnv, rollback) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
}
......@@ -224,12 +221,10 @@ TEST_F(WalCleanEnv, rollback) {
}
TEST_F(WalCleanDeleteEnv, roll) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
int i;
for(i = 0; i < 100; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len);
code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walCommit(pWal, i);
......@@ -242,19 +237,55 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.snapshotVer, i-1);
ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
code = walWrite(pWal, 5, 0, (void*)ranStr, len);
code = walWrite(pWal, 5, 0, (void*)ranStr, ranStrLen);
ASSERT_NE(code, 0);
for(; i < 200; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len);
code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
code = walCommit(pWal, i);
ASSERT_EQ(pWal->vers.commitVer, i);
}
//code = walWriteMeta(pWal);
code = walBeginTakeSnapshot(pWal, i - 1);
ASSERT_EQ(code, 0);
code = walEndTakeSnapshot(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv();
int code;
SWalReadHandle* pRead = walOpenReadHandle(pWal);
ASSERT(pRead != NULL);
int i ;
for(i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len);
ASSERT_EQ(code, 0);
}
for(int i = 0; i < 1000; i++) {
int ver = rand() % 100;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
//printf("rrbody: \n");
//for(int i = 0; i < pRead->pHead->head.len; i++) {
//printf("%d ", pRead->pHead->head.body[i]);
//}
//printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
ASSERT_EQ(pRead->curVersion, ver+1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len);
for(int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册