提交 b6f277b1 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

......@@ -55,12 +55,14 @@ typedef struct {
uint32_t signature;
uint32_t cksumHead;
uint32_t cksumBody;
//char cont[];
char cont[];
} SWalHead;
typedef struct {
int32_t vgId;
int32_t fsyncPeriod; // millisecond
int32_t rollPeriod;
int64_t segSize;
EWalType walLevel; // wal level
} SWalCfg;
......@@ -87,10 +89,14 @@ typedef struct SWal {
// cfg
int32_t vgId;
int32_t fsyncPeriod; // millisecond
int32_t fsyncSeq;
int32_t rollPeriod; // second
int64_t segSize;
int64_t rtSize;
EWalType level;
//total size
int64_t totSize;
//fsync seq
int32_t fsyncSeq;
//reference
int64_t refId;
//current tfd
......@@ -98,25 +104,32 @@ typedef struct SWal {
int64_t curIdxTfd;
//current version
int64_t curVersion;
int64_t curLogOffset;
//current file version
int64_t curFileFirstVersion;
int64_t curFileLastVersion;
//wal fileset version
//int64_t curFileFirstVersion;
//int64_t curFileLastVersion;
//wal lifecycle
int64_t firstVersion;
int64_t snapshotVersion;
int64_t commitVersion;
int64_t lastVersion;
int64_t lastFileName;
//last file
//int64_t lastFileName;
//roll status
int64_t lastRollSeq;
int64_t lastFileWriteSize;
//int64_t lastFileWriteSize;
//file set
int32_t fileCursor;
SArray* fileInfoSet;
//ctl
int32_t curStatus;
pthread_mutex_t mutex;
//path
char path[WAL_PATH_LEN];
//file set
SArray* fileSet;
//reusable write head
SWalHead head;
} SWal; // WAL HANDLE
......@@ -133,7 +146,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *);
// write
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force);
// apis for lifecycle management
......
......@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_FILE_H
#define _TD_UTIL_FILE_H
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -30,10 +30,10 @@ typedef struct {
uint32_t buf[4]; /* scratch buffer */
uint8_t in[64]; /* input buffer */
uint8_t digest[16]; /* actual digest after MD5Final call */
} MD5_CTX;
} T_MD5_CTX;
void MD5Init(MD5_CTX *mdContext);
void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen);
void MD5Final(MD5_CTX *mdContext);
void tMD5Init(T_MD5_CTX *mdContext);
void tMD5Update(T_MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen);
void tMD5Final(T_MD5_CTX *mdContext);
#endif /*_TD_UTIL_MD5_H*/
......@@ -47,10 +47,10 @@ void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
MD5_CTX context;
MD5Init(&context);
MD5Update(&context, inBuf, (unsigned int)inLen);
MD5Final(&context);
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, inBuf, (unsigned int)inLen);
tMD5Final(&context);
memcpy(target, context.digest, TSDB_KEY_LEN);
}
......
......@@ -478,6 +478,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
return 0;
}
FstSlice *slice = &node->data;
uint8_t *data = fstSliceData(slice, NULL);
uint64_t at = node->start
- fstStateNtransLen(s)
- 1 // pack size
......@@ -485,7 +486,6 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
- (i * oSizes)
- oSizes;
uint8_t *data = fstSliceData(slice, NULL);
return unpackUint64(data + at, oSizes);
}
......@@ -555,6 +555,7 @@ Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, Pack
uint64_t at = FST_SLICE_LEN(slice)
- 1
- fstStateNtransLen(s)
- 1 // pack size
- fstStateTotalTransSize(s, version, sizes, nTrans)
- (nTrans * oSizes)
- oSizes;
......@@ -587,7 +588,8 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
FstSlice t = fstSliceCopy(slice, start, end - 1);
int32_t len = 0;
uint8_t *data = fstSliceData(&t, &len);
for(int i = 0; i < len; i++) {
int i = 0;
for(; i < len; i++) {
//uint8_t v = slice->data[slice->start + i];
////slice->data[slice->start + i];
uint8_t v = data[i];
......@@ -595,6 +597,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
return node->nTrans - i - 1; // bug
}
}
if (i == len) { *null = true; }
}
}
......@@ -774,7 +777,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) {
if (NULL == b) { return b; }
b->wrt = fstCountingWriterCreate(w, false);
b->wrt = fstCountingWriterCreate(w, false);
b->unfinished = fstUnFinishedNodesCreate();
b->registry = fstRegistryCreate(10000, 2) ;
b->last = fstSliceCreate(NULL, 0);
......@@ -857,6 +860,7 @@ OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) {
return OutOfOrdered;
}
// deep copy or not
fstSliceDestroy(&b->last);
b->last = fstSliceCopy(&bs, input->start, input->end);
}
return Ordered;
......@@ -1007,8 +1011,7 @@ Fst* fstCreate(FstSlice *slice) {
uint64_t fstLen;
len -= sizeof(fstLen);
taosDecodeFixedU64(buf + len, &fstLen);
//TODO(validat root addr)
//
//TODO(validate root addr)
Fst *fst= (Fst *)calloc(1, sizeof(Fst));
if (fst == NULL) { return NULL; }
......@@ -1023,6 +1026,7 @@ Fst* fstCreate(FstSlice *slice) {
fst->meta->len = fstLen;
fst->meta->checkSum = checkSum;
fst->data = slice;
return fst;
FST_CREAT_FAILED:
......
......@@ -2,13 +2,79 @@
#include <string>
#include <iostream>
#include "index.h"
#include "tutil.h"
#include "indexInt.h"
#include "index_fst.h"
#include "index_fst_util.h"
#include "index_fst_counting_writer.h"
class FstWriter {
public:
FstWriter() {
_b = fstBuilderCreate(NULL, 0);
}
bool Put(const std::string &key, uint64_t val) {
FstSlice skey = fstSliceCreate((uint8_t *)key.c_str(), key.size());
bool ok = fstBuilderInsert(_b, skey, val);
fstSliceDestroy(&skey);
return ok;
}
~FstWriter() {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
}
private:
FstBuilder *_b;
};
class FstReadMemory {
public:
FstReadMemory(size_t size) {
_w = fstCountingWriterCreate(NULL, true);
_size = size;
memset((void *)&_s, 0, sizeof(_s));
}
bool init() {
char *buf = (char *)calloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t *)buf, _size);
if (nRead <= 0) { return false; }
_size = nRead;
_s = fstSliceCreate((uint8_t *)buf, _size);
_fst = fstCreate(&_s);
free(buf);
return _fst != NULL;
}
bool Get(const std::string &key, uint64_t *val) {
FstSlice skey = fstSliceCreate((uint8_t *)key.c_str(), key.size());
bool ok = fstGet(_fst, &skey, val);
fstSliceDestroy(&skey);
return ok;
}
bool GetWithTimeCostUs(const std::string &key, uint64_t *val, uint64_t *elapse) {
int64_t s = taosGetTimestampUs();
bool ok = this->Get(key, val);
int64_t e = taosGetTimestampUs();
*elapse = e - s;
return ok;
}
// add later
bool Search(const std::string &key, std::vector<uint64_t> &result) {
return true;
}
~FstReadMemory() {
fstCountingWriterDestroy(_w);
fstSliceDestroy(&_s);
}
private:
FstCountingWriter *_w;
Fst *_fst;
FstSlice _s;
size_t _size;
};
//TEST(IndexTest, index_create_test) {
// SIndexOpts *opts = indexOptsCreate();
......@@ -62,69 +128,77 @@
// //
//}
int main(int argc, char** argv) {
// test write
FstBuilder *b = fstBuilderCreate(NULL, 0);
{
std::string str("aaa");
FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size());
Output val = 1;
fstBuilderInsert(b, key, val);
void Performance_fstWriteRecords(FstWriter *b) {
std::string str("aa");
for (int i = 0; i < 26; i++) {
str[0] = 'a' + i;
str.resize(2);
for(int j = 0; j < 26; j++) {
str[1] = 'a' + j;
str.resize(2);
for (int k = 0; k < 10; k++) {
str.push_back('a');
b->Put(str, k);
}
}
}
}
void Performance_fstReadRecords(FstReadMemory *m) {
std::string str("a");
for (int i = 0; i < 500; i++) {
//std::string str("aa");
str.push_back('a');
uint64_t out, cost;
bool ok = m->GetWithTimeCostUs(str, &out, &cost);
if (ok == true) {
printf("success to get (%s, %" PRId64"), time cost: %" PRId64")\n", str.c_str(), out, cost);
} else {
printf("failed to get(%s)\n", str.c_str());
}
}
}
//std::string str1("bcd");
//FstSlice key1 = fstSliceCreate((uint8_t *)str1.c_str(), str1.size());
//Output val2 = 10;
int main(int argc, char** argv) {
// test write
//
FstWriter *fw = new FstWriter;
{
for (size_t i = 1; i < 26; i++) {
std::string str("aaa");
str[2] = 'a' + i ;
FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size());
Output val = 0;
fstBuilderInsert(b, key, val);
std::string key("ab");
int64_t val = 100;
for (int i = 0; i < 26; i++) {
key.push_back('a' + i);
fw->Put(key, val++);
}
}
fstBuilderFinish(b);
fstBuilderDestroy(b);
char buf[64 * 1024] = {0};
FstSlice s;
FstCountingWriter *w = fstCountingWriterCreate(NULL, true);
int nRead = fstCountingWriterRead(w, (uint8_t *)buf, sizeof(buf));
assert(nRead <= sizeof(buf));
s = fstSliceCreate((uint8_t *)buf, nRead);
fstCountingWriterDestroy(w);
}
delete fw;
FstReadMemory *m = new FstReadMemory(1024 * 64);
if (m->init() == false) {
std::cout << "init readMemory failed" << std::endl;
}
// test reader
Fst *fst = fstCreate(&s);
{
std::string str("aax");
uint64_t out;
FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size());
bool ok = fstGet(fst, &key, &out);
if (ok == true) {
printf("val = %d\n", out);
//indexInfo("Get key-value success, %s, %d", str.c_str(), out);
} else {
//indexError("Get key-value failed, %s", str.c_str());
std::string key("ab");
uint64_t out;
if (m->Get(key, &out)) {
printf("success to get (%s, %" PRId64")\n", key.c_str(), out);
} else {
printf("failed to get(%s)\n", key.c_str());
}
for (int i = 0; i < 26; i++) {
key.push_back('a' + i);
if (m->Get(key, &out)) {
printf("success to get (%s, %" PRId64")\n", key.c_str(), out);
} else {
printf("failed to get(%s)\n", key.c_str());
}
}
fstSliceDestroy(&s);
}
}
return 1;
}
......
......@@ -1523,14 +1523,14 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
}
static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
MD5_CTX context;
T_MD5_CTX context;
int ret = -1;
MD5Init(&context);
MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
MD5Update(&context, (uint8_t *)pMsg, msgLen);
MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
MD5Final(&context);
tMD5Init(&context);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
tMD5Update(&context, (uint8_t *)pMsg, msgLen);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
tMD5Final(&context);
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
......@@ -1538,13 +1538,13 @@ static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
}
static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
MD5_CTX context;
T_MD5_CTX context;
MD5Init(&context);
MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
MD5Update(&context, (uint8_t *)pMsg, msgLen);
MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
MD5Final(&context);
tMD5Init(&context);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
tMD5Update(&context, (uint8_t *)pMsg, msgLen);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
tMD5Final(&context);
memcpy(pAuth, context.digest, sizeof(context.digest));
}
......
......@@ -8,6 +8,11 @@ target_include_directories(
target_link_libraries(
wal
PUBLIC cjson
PUBLIC os
PUBLIC util
)
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
......@@ -23,9 +23,73 @@
extern "C" {
#endif
int walGetFile(SWal* pWal, int32_t version);
//meta section begin
typedef struct WalFileInfo {
int64_t firstVer;
int64_t lastVer;
int64_t createTs;
int64_t closeTs;
int64_t fileSize;
} WalFileInfo;
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
WalFileInfo* pInfoRight = (WalFileInfo*)pRight;
return compareInt64Val(&pInfoLeft->firstVer, &pInfoRight->firstVer);
}
static inline int64_t walGetLastFileSize(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return pInfo->fileSize;
}
static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return pInfo->firstVer;
}
static inline int64_t walGetCurFileFirstVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
return pInfo->firstVer;
}
static inline int64_t walGetCurFileLastVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
return pInfo->firstVer;
}
static inline int64_t walGetCurFileOffset(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
return pInfo->fileSize;
}
static inline bool walCurFileClosed(SWal* pWal) {
return taosArrayGetSize(pWal->fileInfoSet) != pWal->fileCursor;
}
static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) {
return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
}
static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer);
}
static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
}
int walReadMeta(SWal* pWal);
int walWriteMeta(SWal* pWal);
int walRollFileInfo(SWal* pWal);
char* walFileInfoSerialize(SWal* pWal);
SArray* walFileInfoDeserialize(const char* bytes);
//meta section end
int64_t walGetSeq();
int walSeekVer(SWal *pWal, int64_t ver);
int walRoll(SWal *pWal);
#ifdef __cplusplus
}
......
......@@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t logTfd = pWal->curLogTfd;
//seek position
int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE;
int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
code = tfLseek(idxTfd, offset, SEEK_SET);
if(code != 0) {
......@@ -43,7 +43,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
if (code != 0) {
}
pWal->curLogOffset = readBuf[1];
/*pWal->curLogOffset = readBuf[1];*/
pWal->curVersion = ver;
return code;
}
......@@ -60,27 +60,27 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
if(code != 0) {
//TODO
}
WalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
//bsearch in fileSet
int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE);
WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
int64_t fname = *pRet;
if(fname < pWal->lastFileName) {
int64_t fileFirstVer = pRet->firstVer;
//closed
if(taosArrayGetLast(pWal->fileInfoSet) != pRet) {
pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE;
pWal->curFileLastVersion = pRet[1]-1;
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenRead(fnameStr);
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenRead(fnameStr);
} else {
pWal->curStatus |= WAL_CUR_FILE_WRITABLE;
pWal->curFileLastVersion = -1;
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr);
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr);
}
pWal->curFileFirstVersion = fname;
pWal->curLogTfd = logTfd;
pWal->curIdxTfd = idxTfd;
return code;
......@@ -102,8 +102,7 @@ int walSeekVer(SWal *pWal, int64_t ver) {
if(ver < pWal->snapshotVersion) {
//TODO: seek snapshotted log, invalid in some cases
}
if(ver < pWal->curFileFirstVersion ||
(pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
walChangeFile(pWal, ver);
}
walSeekFilePos(pWal, ver);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tref.h"
#include "tfile.h"
#include "cJSON.h"
#include "walInt.h"
#include <libgen.h>
#include <regex.h>
int walRollFileInfo(SWal* pWal) {
int64_t ts = taosGetTimestampSec();
SArray* pArray = pWal->fileInfoSet;
if(taosArrayGetSize(pArray) != 0) {
WalFileInfo *pInfo = taosArrayGetLast(pArray);
pInfo->lastVer = pWal->lastVersion;
pInfo->closeTs = ts;
}
WalFileInfo *pNewInfo = malloc(sizeof(WalFileInfo));
if(pNewInfo == NULL) {
return -1;
}
pNewInfo->firstVer = pWal->lastVersion + 1;
pNewInfo->lastVer = -1;
pNewInfo->createTs = ts;
pNewInfo->closeTs = -1;
pNewInfo->fileSize = 0;
taosArrayPush(pWal->fileInfoSet, pNewInfo);
return 0;
}
char* walFileInfoSerialize(SWal* pWal) {
char buf[30];
if(pWal == NULL || pWal->fileInfoSet == NULL) return 0;
int sz = pWal->fileInfoSet->size;
cJSON* root = cJSON_CreateArray();
cJSON* field;
if(root == NULL) {
//TODO
return NULL;
}
WalFileInfo* pData = pWal->fileInfoSet->pData;
for(int i = 0; i < sz; i++) {
WalFileInfo* pInfo = &pData[i];
cJSON_AddItemToArray(root, field = cJSON_CreateObject());
if(field == NULL) {
cJSON_Delete(root);
return NULL;
}
//cjson only support int32_t or double
//string are used to prohibit the loss of precision
sprintf(buf, "%ld", pInfo->firstVer);
cJSON_AddStringToObject(field, "firstVer", buf);
sprintf(buf, "%ld", pInfo->lastVer);
cJSON_AddStringToObject(field, "lastVer", buf);
sprintf(buf, "%ld", pInfo->createTs);
cJSON_AddStringToObject(field, "createTs", buf);
sprintf(buf, "%ld", pInfo->closeTs);
cJSON_AddStringToObject(field, "closeTs", buf);
sprintf(buf, "%ld", pInfo->fileSize);
cJSON_AddStringToObject(field, "fileSize", buf);
}
return cJSON_Print(root);
}
SArray* walFileInfoDeserialize(const char* bytes) {
cJSON *root, *pInfoJson, *pField;
root = cJSON_Parse(bytes);
int sz = cJSON_GetArraySize(root);
//deserialize
SArray* pArray = taosArrayInit(sz, sizeof(WalFileInfo));
WalFileInfo *pData = pArray->pData;
for(int i = 0; i < sz; i++) {
cJSON* pInfoJson = cJSON_GetArrayItem(root, i);
WalFileInfo* pInfo = &pData[i];
pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
pInfo->firstVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "lastVer");
pInfo->lastVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "createTs");
pInfo->createTs = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "closeTs");
pInfo->closeTs = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "fileSize");
pInfo->fileSize = atoll(cJSON_GetStringValue(pField));
}
taosArraySetSize(pArray, sz);
return pArray;
}
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
}
static int walFindCurMetaVer(SWal* pWal) {
const char * pattern = "^meta-ver[0-9]+$";
regex_t walMetaRegexPattern;
regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED);
DIR *dir = opendir(pWal->path);
if(dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent* ent;
//find existing meta-ver[x].json
int metaVer = -1;
while((ent = readdir(dir)) != NULL) {
char *name = basename(ent->d_name);
int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0);
if(code == 0) {
sscanf(name, "meta-ver%d", &metaVer);
break;
}
}
return metaVer;
}
int walWriteMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer+1, fnameStr);
int metaTfd = tfOpenCreateWrite(fnameStr);
if(metaTfd < 0) {
return -1;
}
char* serialized = walFileInfoSerialize(pWal);
int len = strlen(serialized);
if(len != tfWrite(metaTfd, serialized, len)) {
//TODO:clean file
return -1;
}
tfClose(metaTfd);
//delete old file
if(metaVer > -1) {
walBuildMetaName(pWal, metaVer, fnameStr);
remove(fnameStr);
}
return 0;
}
int walReadMeta(SWal* pWal) {
ASSERT(pWal->fileInfoSet->size == 0);
//find existing meta file
int metaVer = walFindCurMetaVer(pWal);
if(metaVer == -1) {
return 0;
}
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr);
//read metafile
struct stat statbuf;
stat(fnameStr, &statbuf);
int size = statbuf.st_size;
char* buf = malloc(size + 5);
if(buf == NULL) {
return -1;
}
int tfd = tfOpenRead(fnameStr);
if(tfRead(tfd, buf, size) != size) {
free(buf);
return -1;
}
//load into fileInfoSet
pWal->fileInfoSet = walFileInfoDeserialize(buf);
if(pWal->fileInfoSet == NULL) {
free(buf);
return -1;
}
free(buf);
return 0;
}
......@@ -64,43 +64,31 @@ int32_t walInit() {
void walCleanUp() {
walStopThread();
taosCloseRef(tsWal.refSetId);
atomic_store_8(&tsWal.inited, 0);
wInfo("wal module is cleaned up");
}
static int walLoadFileset(SWal *pWal) {
DIR *dir = opendir(pWal->path);
if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent* ent;
while ((ent = readdir(dir)) != NULL) {
char *name = ent->d_name;
name[WAL_NOSUFFIX_LEN] = 0;
//validate file name by regex matching
if(1 /* TODO:regex match */) {
int64_t fnameInt64 = atoll(name);
taosArrayPush(pWal->fileSet, &fnameInt64);
}
}
taosArraySort(pWal->fileSet, compareInt64Val);
return 0;
}
SWal *walOpen(const char *path, SWalCfg *pCfg) {
SWal *pWal = malloc(sizeof(SWal));
if (pWal == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
pWal->vgId = pCfg->vgId;
pWal->curLogTfd = -1;
pWal->curIdxTfd = -1;
pWal->level = pCfg->walLevel;
//set config
pWal->vgId = pCfg->vgId;
pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->rollPeriod = pCfg->rollPeriod;
pWal->segSize = pCfg->segSize;
pWal->level = pCfg->walLevel;
//init status
pWal->lastVersion = -1;
pWal->lastRollSeq = -1;
//init write buffer
memset(&pWal->head, 0, sizeof(SWalHead));
pWal->head.sver = 0;
......@@ -120,7 +108,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walFreeObj(pWal);
return NULL;
}
walLoadFileset(pWal);
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
......@@ -153,8 +140,8 @@ void walClose(SWal *pWal) {
pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->curLogTfd);
tfClose(pWal->curIdxTfd);
taosArrayDestroy(pWal->fileSet);
pWal->fileSet = NULL;
/*taosArrayDestroy(pWal->fileInfoSet);*/
/*pWal->fileInfoSet = NULL;*/
pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refSetId, pWal->refId);
}
......@@ -164,8 +151,8 @@ static int32_t walInitObj(SWal *pWal) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
pWal->fileSet = taosArrayInit(0, sizeof(int64_t));
if(pWal->fileSet == NULL) {
pWal->fileInfoSet = taosArrayInit(0, sizeof(WalFileInfo));
if(pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
......@@ -180,8 +167,10 @@ static void walFreeObj(void *wal) {
tfClose(pWal->curLogTfd);
tfClose(pWal->curIdxTfd);
taosArrayDestroy(pWal->fileSet);
pWal->fileSet = NULL;
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
pthread_mutex_destroy(&pWal->mutex);
tfree(pWal);
}
......@@ -210,7 +199,7 @@ static void walFsyncAll() {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
int32_t code = tfFsync(pWal->curLogTfd);
if (code != 0) {
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code));
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code));
}
}
pWal = taosIterateRef(tsWal.refSetId, pWal->refId);
......
......@@ -13,16 +13,56 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "wal.h"
#include "walInt.h"
#include "tfile.h"
#include "tchecksum.h"
static int walValidateChecksum(SWalHead *pHead, void* body, int64_t bodyLen) {
return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead) &&
taosCheckChecksum(body, bodyLen, pHead->cksumBody);
static inline int walValidHeadCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead);
}
static inline int walValidBodyCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->cont, pHead->len, pHead->cksumBody);
}
static int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) {
return walValidHeadCksum(pHead) && walValidBodyCksum(pHead);
}
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
code = walSeekVer(pWal, ver);
if(code != 0) {
return code;
}
if(*ppHead == NULL) {
void* ptr = realloc(*ppHead, sizeof(SWalHead));
if(ptr == NULL) {
return -1;
}
*ppHead = ptr;
}
if(tfRead(pWal->curLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1;
}
//TODO: endian compatibility processing after read
if(walValidHeadCksum(*ppHead) != 0) {
return -1;
}
void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->len);
if(ptr == NULL) {
free(*ppHead);
*ppHead = NULL;
return -1;
}
if(tfRead(pWal->curLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) {
return -1;
}
//TODO: endian compatibility processing after read
if(walValidBodyCksum(*ppHead) != 0) {
return -1;
}
return 0;
}
......
......@@ -21,29 +21,42 @@
#include "tfile.h"
#include "walInt.h"
static void walFtruncate(SWal *pWal, int64_t ver);
int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->snapshotVersion <= pWal->commitVersion);
ASSERT(pWal->commitVersion <= pWal->lastVersion);
ASSERT(ver >= pWal->commitVersion);
ASSERT(ver <= pWal->lastVersion);
pWal->commitVersion = ver;
return 0;
}
int32_t walRollback(SWal *pWal, int64_t ver) {
//TODO: ftruncate
ASSERT(ver > pWal->commitVersion);
ASSERT(ver <= pWal->lastVersion);
//seek position
walSeekVer(pWal, ver);
walFtruncate(pWal, ver);
return 0;
}
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
pWal->snapshotVersion = ver;
WalFileInfo tmp;
tmp.firstVer = ver;
//mark files safe to delete
int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE);
if(pRet != pWal->fileSet->pData) {
//delete files until less than retention size
//find first file that exceeds retention time
}
WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
//iterate files, until the searched result
//if totSize > rtSize, delete
//if createTs > retentionTs, delete
//save snapshot ver, commit ver
//make new array, remove files
//delete files living longer than retention limit
//remove file from fileset
return 0;
}
......@@ -138,105 +151,123 @@ void walRemoveAllOldFiles(void *handle) {
}
#endif
static int walRoll(SWal *pWal) {
int walRoll(SWal *pWal) {
int code = 0;
code = tfClose(pWal->curIdxTfd);
if(code != 0) {
return code;
if(pWal->curIdxTfd != -1) {
code = tfClose(pWal->curIdxTfd);
if(code != 0) {
return -1;
}
}
code = tfClose(pWal->curLogTfd);
if(code != 0) {
return code;
if(pWal->curLogTfd != -1) {
code = tfClose(pWal->curLogTfd);
if(code != 0) {
return -1;
}
}
int64_t idxTfd, logTfd;
//create new file
int64_t newFileFirstVersion = pWal->lastVersion + 1;
char fnameStr[WAL_FILE_LEN];
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion);
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
idxTfd = tfOpenCreateWrite(fnameStr);
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion);
if(idxTfd < 0) {
ASSERT(0);
return -1;
}
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
logTfd = tfOpenCreateWrite(fnameStr);
if(logTfd < 0) {
ASSERT(0);
return -1;
}
code = walRollFileInfo(pWal);
if(code != 0) {
ASSERT(0);
return -1;
}
taosArrayPush(pWal->fileSet, &newFileFirstVersion);
//switch file
pWal->curIdxTfd = idxTfd;
pWal->curLogTfd = logTfd;
//change status
pWal->curFileLastVersion = -1;
pWal->curFileFirstVersion = newFileFirstVersion;
pWal->curVersion = newFileFirstVersion;
pWal->curLogOffset = 0;
pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
pWal->lastFileName = newFileFirstVersion;
pWal->lastFileWriteSize = 0;
pWal->lastRollSeq = walGetSeq();
return 0;
}
int walChangeFileToLast(SWal *pWal) {
int64_t idxTfd, logTfd;
int64_t* pRet = taosArrayGetLast(pWal->fileSet);
WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
ASSERT(pRet != NULL);
int64_t fname = *pRet;
int64_t fileFirstVer = pRet->firstVer;
char fnameStr[WAL_FILE_LEN];
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr);
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
if(idxTfd < 0) {
return -1;
}
walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr);
if(logTfd < 0) {
return -1;
}
//switch file
pWal->curIdxTfd = idxTfd;
pWal->curLogTfd = logTfd;
//change status
pWal->curFileLastVersion = -1;
pWal->curFileFirstVersion = fname;
pWal->curVersion = fname;
pWal->curLogOffset = 0;
pWal->curVersion = fileFirstVer;
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
return 0;
}
int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int code = 0;
//get index file
if(!tfValid(pWal->curIdxTfd)) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pInfo->firstVer, strerror(errno));
return code;
}
int64_t writeBuf[2] = { ver, offset };
int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf));
if(size != sizeof(writeBuf)) {
//TODO:
return -1;
}
return 0;
}
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) {
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) {
if (pWal == NULL) return -1;
int code = 0;
// no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (index == pWal->lastVersion + 1) {
int64_t passed = walGetSeq() - pWal->lastRollSeq;
if(passed > pWal->rollPeriod) {
walRoll(pWal);
} else if(pWal->lastFileWriteSize > pWal->segSize) {
walRoll(pWal);
if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
code = walRoll(pWal);
ASSERT(code == 0);
} else {
walChangeFileToLast(pWal);
int64_t passed = walGetSeq() - pWal->lastRollSeq;
if(pWal->rollPeriod != -1 && passed > pWal->rollPeriod) {
walRoll(pWal);
} else if(pWal->segSize != -1 && walGetLastFileSize(pWal) > pWal->segSize) {
walRoll(pWal);
}
}
} else {
//reject skip log or rewrite log
//must truncate explicitly first
return -1;
}
if (!tfValid(pWal->curLogTfd)) return 0;
/*if (!tfValid(pWal->curLogTfd)) return 0;*/
pWal->head.version = index;
int32_t code = 0;
pWal->head.signature = WAL_SIGNATURE;
pWal->head.len = bodyLen;
......@@ -250,19 +281,23 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
}
if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
}
code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal));
if(code != 0) {
//TODO
}
walWriteIndex(pWal, index, pWal->curLogOffset);
pWal->curLogOffset += sizeof(SWalHead) + bodyLen;
//set status
pWal->lastVersion = index;
walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
pthread_mutex_unlock(&pWal->mutex);
......@@ -273,9 +308,9 @@ void walFsync(SWal *pWal, bool forceFsync) {
if (pWal == NULL || !tfValid(pWal->curLogTfd)) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, pWal->curFileFirstVersion);
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal));
if (tfFsync(pWal->curLogTfd) < 0) {
wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno));
}
}
}
......@@ -348,8 +383,36 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
}
#endif
static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) {
tfFtruncate(tfd, offset);
static int walValidateOffset(SWal* pWal, int64_t ver) {
int code = 0;
SWalHead *pHead = NULL;
code = (int)walRead(pWal, &pHead, ver);
if(pHead->version != ver) {
return -1;
}
return 0;
}
static int64_t walGetOffset(SWal* pWal, int64_t ver) {
int code = walSeekVer(pWal, ver);
if(code != 0) {
return -1;
}
code = walValidateOffset(pWal, ver);
if(code != 0) {
return -1;
}
return 0;
}
static void walFtruncate(SWal *pWal, int64_t ver) {
int64_t tfd = pWal->curLogTfd;
tfFtruncate(tfd, ver);
tfFsync(tfd);
tfd = pWal->curIdxTfd;
tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE);
tfFsync(tfd);
}
......
add_executable(walTest "")
target_sources(walTest
PRIVATE
"walMetaTest.cpp"
)
target_include_directories(walTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/wal"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(walTest
wal
gtest_main
)
enable_testing()
add_test(
NAME wal_test
COMMAND walTest
)
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
#include "tfile.h"
#include "walInt.h"
class WalCleanEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit();
ASSERT(code == 0);
code = tfInit();
ASSERT(code == 0);
}
static void TearDownTestCase() {
walCleanUp();
tfCleanup();
}
void SetUp() override {
taosRemoveDir(pathName);
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->rollPeriod = -1;
pCfg->segSize = -1;
pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = "/tmp/wal_test";
};
class WalKeepEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit();
ASSERT(code == 0);
code = tfInit();
ASSERT(code == 0);
}
static void TearDownTestCase() {
walCleanUp();
tfCleanup();
}
void SetUp() override {
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->rollPeriod = -1;
pCfg->segSize = -1;
pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = "/tmp/wal_test";
};
TEST_F(WalCleanEnv, createNew) {
walRollFileInfo(pWal);
ASSERT(pWal->fileInfoSet != NULL);
ASSERT_EQ(pWal->fileInfoSet->size, 1);
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
ASSERT_EQ(pInfo->firstVer, 0);
ASSERT_EQ(pInfo->lastVer, -1);
ASSERT_EQ(pInfo->closeTs, -1);
ASSERT_EQ(pInfo->fileSize, 0);
}
TEST_F(WalCleanEnv, serialize) {
int code = walRollFileInfo(pWal);
ASSERT(code == 0);
ASSERT(pWal->fileInfoSet != NULL);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
char*ss = walFileInfoSerialize(pWal);
printf("%s\n", ss);
code = walWriteMeta(pWal);
ASSERT(code == 0);
}
TEST_F(WalCleanEnv, removeOldMeta) {
int code = walRollFileInfo(pWal);
ASSERT(code == 0);
ASSERT(pWal->fileInfoSet != NULL);
code = walWriteMeta(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walWriteMeta(pWal);
ASSERT(code == 0);
}
TEST_F(WalKeepEnv, readOldMeta) {
int code = walRollFileInfo(pWal);
ASSERT(code == 0);
ASSERT(pWal->fileInfoSet != NULL);
code = walWriteMeta(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walWriteMeta(pWal);
ASSERT(code == 0);
char*oldss = walFileInfoSerialize(pWal);
TearDown();
SetUp();
code = walReadMeta(pWal);
ASSERT(code == 0);
char* newss = walFileInfoSerialize(pWal);
int len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
for(int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]);
}
}
TEST_F(WalKeepEnv, write) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
ASSERT_EQ(code, 0);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
ASSERT_EQ(code, -1);
}
code = walWriteMeta(pWal);
ASSERT_EQ(code, 0);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "tutil.h"
#include "tglobal.h"
#include "tlog.h"
#include "twal.h"
#include "tfile.h"
int64_t ver = 0;
void *pWal = NULL;
int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
// do nothing
SWalHead *pHead = data;
if (pHead->version > ver)
ver = pHead->version;
walWrite(pWal, pHead);
return 0;
}
int main(int argc, char *argv[]) {
char path[128] = "/tmp/wal";
int level = 2;
int total = 5;
int rows = 10000;
int size = 128;
int keep = 0;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
tstrncpy(path, argv[++i], sizeof(path));
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
level = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
rows = atoi(argv[++i]);
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
keep = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
total = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
size = atoi(argv[++i]);
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) {
ver = atoll(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
dDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-p path]: wal file path default is:%s\n", path);
printf(" [-l level]: log level, default is:%d\n", level);
printf(" [-t total]: total wal files, default is:%d\n", total);
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
printf(" [-k keep]: keep the wal after closing, default is:%d\n", keep);
printf(" [-v version]: initial version, default is:%" PRId64 "\n", ver);
printf(" [-d debugFlag]: debug flag, default:%d\n", dDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("wal.log", 100000, 10);
tfInit();
walInit();
SWalCfg walCfg = {0};
walCfg.walLevel = level;
walCfg.keep = keep;
pWal = walOpen(path, &walCfg);
if (pWal == NULL) {
printf("failed to open wal\n");
exit(-1);
}
int ret = walRestore(pWal, NULL, writeToQueue);
if (ret <0) {
printf("failed to restore wal\n");
exit(-1);
}
printf("version starts from:%" PRId64 "\n", ver);
int contLen = sizeof(SWalHead) + size;
SWalHead *pHead = (SWalHead *) malloc(contLen);
for (int i=0; i<total; ++i) {
for (int k=0; k<rows; ++k) {
pHead->version = ++ver;
pHead->len = size;
walWrite(pWal, pHead);
}
printf("renew a wal, i:%d\n", i);
walRenew(pWal);
}
printf("%d wal files are written\n", total);
int64_t index = 0;
char name[256];
while (1) {
int code = walGetWalFile(pWal, name, &index);
if (code == -1) {
printf("failed to get wal file, index:%" PRId64 "\n", index);
break;
}
printf("index:%" PRId64 " wal:%s\n", index, name);
if (code == 0) break;
}
getchar();
walClose(pWal);
walCleanUp();
tfCleanup();
return 0;
}
......@@ -84,8 +84,8 @@ static uint8_t PADDING[64] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x
/* The routine MD5Init initializes the message-digest context
mdContext. All fields are set to zero.
*/
void MD5Init(MD5_CTX *mdContext) {
memset(mdContext, 0, sizeof(MD5_CTX));
void tMD5Init(T_MD5_CTX *mdContext) {
memset(mdContext, 0, sizeof(T_MD5_CTX));
/* Load magic initialization constants. */
mdContext->buf[0] = (uint32_t)0x67452301;
......@@ -98,7 +98,7 @@ void MD5Init(MD5_CTX *mdContext) {
account for the presence of each of the characters inBuf[0..inLen-1]
in the message whose digest is being computed.
*/
void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) {
void tMD5Update(T_MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) {
uint32_t in[16];
int mdi;
unsigned int i, ii;
......@@ -129,7 +129,7 @@ void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) {
/* The routine MD5Final terminates the message-digest computation and
ends with the desired message digest in mdContext->digest[0...15].
*/
void MD5Final(MD5_CTX *mdContext) {
void tMD5Final(T_MD5_CTX *mdContext) {
uint32_t in[16];
int mdi;
unsigned int i, ii;
......@@ -144,7 +144,7 @@ void MD5Final(MD5_CTX *mdContext) {
/* pad out to 56 mod 64 */
padLen = (mdi < 56) ? (56 - mdi) : (120 - mdi);
MD5Update(mdContext, PADDING, padLen);
tMD5Update(mdContext, PADDING, padLen);
/* append length in bits and transform */
for (i = 0, ii = 0; i < 14; i++, ii += 4)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册