提交 30321c34 编写于 作者: L lichuang

Merge branch '3.0' into feature/sync-implementation

......@@ -50,8 +50,11 @@ endif(${BUILD_WITH_LEVELDB})
# rocksdb
# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev
if(${BUILD_WITH_ROCKSDB})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
option(WITH_TESTS "" OFF)
option(WITH_BENCHMARK_TOOLS "" OFF)
option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
add_subdirectory(rocksdb)
target_include_directories(
......
......@@ -44,6 +44,21 @@ extern "C" {
#define TQ_SVER 0
//TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2
static inline bool TqUpdateAppend(int32_t tqConfigFlag) {
return tqConfigFlag & TQ_UPDATE_APPEND;
}
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) {
return tqConfigFlag & TQ_DUP_INTXN_REJECT;
}
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
......@@ -79,6 +94,7 @@ typedef struct TqMetaStore {
int fileFd; //TODO:temporaral use, to be replaced by unified tfile
int idxFd; //TODO:temporaral use, to be replaced by unified tfile
char* dirPath;
int32_t tqConfigFlag;
int (*serializer)(const void* pObj, TqSerializedHead** ppHead);
const void* (*deserializer)(const TqSerializedHead* pHead, void** ppObj);
void (*deleter)(void*);
......@@ -87,7 +103,9 @@ typedef struct TqMetaStore {
TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, TqSerializedHead** ppHead),
const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
void deleter(void* pObj));
void deleter(void* pObj),
int32_t tqConfigFlag
);
int32_t tqStoreClose(TqMetaStore*);
//int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*);
......@@ -96,6 +114,8 @@ int32_t tqStorePersist(TqMetaStore*);
int32_t tqStoreCompact(TqMetaStore*);
void* tqHandleGet(TqMetaStore*, int64_t key);
//make it unpersist
void* tqHandleTouchGet(TqMetaStore*, int64_t key);
int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value);
int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize);
//delete committed kv pair
......
......@@ -71,7 +71,9 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, TqSerializedHead** ppHead),
const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
void deleter(void* pObj)) {
void deleter(void* pObj),
int32_t tqConfigFlag
) {
TqMetaStore* pMeta = malloc(sizeof(TqMetaStore));
if(pMeta == NULL) {
//close
......@@ -128,6 +130,7 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta->serializer = serializer;
pMeta->deserializer = deserializer;
pMeta->deleter = deleter;
pMeta->tqConfigFlag = tqConfigFlag;
//read idx file and load into memory
TqIdxPageBuf idxBuf;
......@@ -463,56 +466,40 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
return NULL;
}
int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) {
void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) {
if(pNode->handle.key == key) {
//TODO: think about thread safety
if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn);
if(pNode->handle.valueInUse != NULL
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
tqLinkUnpersist(pMeta, pNode);
return pNode->handle.valueInUse;
} else {
return NULL;
}
//change pointer ownership
pNode->handle.valueInTxn = value;
tqLinkUnpersist(pMeta, pNode);
return 0;
} else {
pNode = pNode->next;
}
}
TqMetaList *pNewNode = malloc(sizeof(TqMetaList));
if(pNewNode == NULL) {
//TODO: memory error
return -1;
}
memset(pNewNode, 0, sizeof(TqMetaList));
pNewNode->handle.key = key;
pNewNode->handle.valueInTxn = value;
pNewNode->next = pMeta->bucket[bucketKey];
pMeta->bucket[bucketKey] = pNewNode;
tqLinkUnpersist(pMeta, pNewNode);
return 0;
return NULL;
}
int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
void *vmem = malloc(vsize);
if(vmem == NULL) {
//TODO: memory error
return -1;
}
memcpy(vmem, value, vsize);
static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* value) {
int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) {
if(pNode->handle.key == key) {
//TODO: think about thread safety
if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn);
if(pNode->handle.valueInTxn) {
if(TqDupIntxnReject(pMeta->tqConfigFlag)) {
return -2;
}
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn);
}
}
//change pointer ownership
pNode->handle.valueInTxn = vmem;
pNode->handle.valueInTxn = value;
tqLinkUnpersist(pMeta, pNode);
return 0;
} else {
......@@ -526,13 +513,27 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
}
memset(pNewNode, 0, sizeof(TqMetaList));
pNewNode->handle.key = key;
pNewNode->handle.valueInTxn = vmem;
pNewNode->handle.valueInTxn = value;
pNewNode->next = pMeta->bucket[bucketKey];
pMeta->bucket[bucketKey] = pNewNode;
tqLinkUnpersist(pMeta, pNewNode);
return 0;
}
int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) {
return tqHandlePutImpl(pMeta, key, value);
}
int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
void *vmem = malloc(vsize);
if(vmem == NULL) {
//TODO: memory error
return -1;
}
memcpy(vmem, value, vsize);
return tqHandlePutImpl(pMeta, key, vmem);
}
static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey];
......
......@@ -32,13 +32,15 @@ void FooDeleter(void* pObj) {
free(pObj);
}
class TqMetaTest : public ::testing::Test {
class TqMetaUpdateAppendTest : public ::testing::Test {
protected:
void SetUp() override {
taosRemoveDir(pathName);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta);
}
......@@ -50,7 +52,7 @@ class TqMetaTest : public ::testing::Test {
const char* pathName = "/tmp/tq_test";
};
TEST_F(TqMetaTest, copyPutTest) {
TEST_F(TqMetaUpdateAppendTest, copyPutTest) {
Foo foo;
foo.a = 3;
tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo));
......@@ -63,7 +65,7 @@ TEST_F(TqMetaTest, copyPutTest) {
EXPECT_EQ(pFoo->a, 3);
}
TEST_F(TqMetaTest, persistTest) {
TEST_F(TqMetaUpdateAppendTest, persistTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 2;
tqHandleMovePut(pMeta, 1, pFoo);
......@@ -77,7 +79,9 @@ TEST_F(TqMetaTest, persistTest) {
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta);
pBar = (Foo*)tqHandleGet(pMeta, 1);
......@@ -88,7 +92,7 @@ TEST_F(TqMetaTest, persistTest) {
EXPECT_EQ(pBar == NULL, true);
}
TEST_F(TqMetaTest, uncommittedTest) {
TEST_F(TqMetaUpdateAppendTest, uncommittedTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo);
......@@ -97,7 +101,7 @@ TEST_F(TqMetaTest, uncommittedTest) {
EXPECT_EQ(pFoo == NULL, true);
}
TEST_F(TqMetaTest, abortTest) {
TEST_F(TqMetaUpdateAppendTest, abortTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo);
......@@ -110,7 +114,7 @@ TEST_F(TqMetaTest, abortTest) {
EXPECT_EQ(pFoo == NULL, true);
}
TEST_F(TqMetaTest, deleteTest) {
TEST_F(TqMetaUpdateAppendTest, deleteTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo);
......@@ -135,14 +139,16 @@ TEST_F(TqMetaTest, deleteTest) {
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta);
pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true);
}
TEST_F(TqMetaTest, intxnPersist) {
TEST_F(TqMetaUpdateAppendTest, intxnPersist) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo);
......@@ -157,7 +163,9 @@ TEST_F(TqMetaTest, intxnPersist) {
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
......@@ -170,14 +178,16 @@ TEST_F(TqMetaTest, intxnPersist) {
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo1->a, 4);
}
TEST_F(TqMetaTest, multiplePage) {
TEST_F(TqMetaUpdateAppendTest, multiplePage) {
srand(0);
std::vector<int> v;
for(int i = 0; i < 1000; i++) {
......@@ -195,7 +205,9 @@ TEST_F(TqMetaTest, multiplePage) {
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta);
for(int i = 500; i < 1000; i++) {
......@@ -213,7 +225,7 @@ TEST_F(TqMetaTest, multiplePage) {
}
TEST_F(TqMetaTest, multipleRewrite) {
TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
srand(0);
std::vector<int> v;
for(int i = 0; i < 1000; i++) {
......@@ -244,7 +256,9 @@ TEST_F(TqMetaTest, multipleRewrite) {
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta);
for(int i = 500; i < 1000; i++) {
......@@ -263,7 +277,7 @@ TEST_F(TqMetaTest, multipleRewrite) {
}
TEST_F(TqMetaTest, dupCommit) {
TEST_F(TqMetaUpdateAppendTest, dupCommit) {
srand(0);
std::vector<int> v;
for(int i = 0; i < 1000; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册