提交 ea6bd5f2 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

...@@ -50,8 +50,11 @@ endif(${BUILD_WITH_LEVELDB}) ...@@ -50,8 +50,11 @@ endif(${BUILD_WITH_LEVELDB})
# rocksdb # rocksdb
# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev # To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
option(WITH_TESTS "" OFF) option(WITH_TESTS "" OFF)
option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF)
option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
add_subdirectory(rocksdb) add_subdirectory(rocksdb)
target_include_directories( target_include_directories(
......
...@@ -44,6 +44,21 @@ extern "C" { ...@@ -44,6 +44,21 @@ extern "C" {
#define TQ_SVER 0 #define TQ_SVER 0
//TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2
static inline bool TqUpdateAppend(int32_t tqConfigFlag) {
return tqConfigFlag & TQ_UPDATE_APPEND;
}
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) {
return tqConfigFlag & TQ_DUP_INTXN_REJECT;
}
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE #define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
...@@ -79,6 +94,7 @@ typedef struct TqMetaStore { ...@@ -79,6 +94,7 @@ typedef struct TqMetaStore {
int fileFd; //TODO:temporaral use, to be replaced by unified tfile int fileFd; //TODO:temporaral use, to be replaced by unified tfile
int idxFd; //TODO:temporaral use, to be replaced by unified tfile int idxFd; //TODO:temporaral use, to be replaced by unified tfile
char* dirPath; char* dirPath;
int32_t tqConfigFlag;
int (*serializer)(const void* pObj, TqSerializedHead** ppHead); int (*serializer)(const void* pObj, TqSerializedHead** ppHead);
const void* (*deserializer)(const TqSerializedHead* pHead, void** ppObj); const void* (*deserializer)(const TqSerializedHead* pHead, void** ppObj);
void (*deleter)(void*); void (*deleter)(void*);
...@@ -87,7 +103,9 @@ typedef struct TqMetaStore { ...@@ -87,7 +103,9 @@ typedef struct TqMetaStore {
TqMetaStore* tqStoreOpen(const char* path, TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, TqSerializedHead** ppHead), int serializer(const void* pObj, TqSerializedHead** ppHead),
const void* deserializer(const TqSerializedHead* pHead, void** ppObj), const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
void deleter(void* pObj)); void deleter(void* pObj),
int32_t tqConfigFlag
);
int32_t tqStoreClose(TqMetaStore*); int32_t tqStoreClose(TqMetaStore*);
//int32_t tqStoreDelete(TqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*); //int32_t TqStoreCommitAll(TqMetaStore*);
...@@ -96,6 +114,8 @@ int32_t tqStorePersist(TqMetaStore*); ...@@ -96,6 +114,8 @@ int32_t tqStorePersist(TqMetaStore*);
int32_t tqStoreCompact(TqMetaStore*); int32_t tqStoreCompact(TqMetaStore*);
void* tqHandleGet(TqMetaStore*, int64_t key); void* tqHandleGet(TqMetaStore*, int64_t key);
//make it unpersist
void* tqHandleTouchGet(TqMetaStore*, int64_t key);
int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value); int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value);
int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize); int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize);
//delete committed kv pair //delete committed kv pair
......
...@@ -71,7 +71,9 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { ...@@ -71,7 +71,9 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
TqMetaStore* tqStoreOpen(const char* path, TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, TqSerializedHead** ppHead), int serializer(const void* pObj, TqSerializedHead** ppHead),
const void* deserializer(const TqSerializedHead* pHead, void** ppObj), const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
void deleter(void* pObj)) { void deleter(void* pObj),
int32_t tqConfigFlag
) {
TqMetaStore* pMeta = malloc(sizeof(TqMetaStore)); TqMetaStore* pMeta = malloc(sizeof(TqMetaStore));
if(pMeta == NULL) { if(pMeta == NULL) {
//close //close
...@@ -128,6 +130,7 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -128,6 +130,7 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta->serializer = serializer; pMeta->serializer = serializer;
pMeta->deserializer = deserializer; pMeta->deserializer = deserializer;
pMeta->deleter = deleter; pMeta->deleter = deleter;
pMeta->tqConfigFlag = tqConfigFlag;
//read idx file and load into memory //read idx file and load into memory
TqIdxPageBuf idxBuf; TqIdxPageBuf idxBuf;
...@@ -463,56 +466,40 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { ...@@ -463,56 +466,40 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
return NULL; return NULL;
} }
int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
//TODO: think about thread safety if(pNode->handle.valueInUse != NULL
if(pNode->handle.valueInTxn && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn);
}
//change pointer ownership
pNode->handle.valueInTxn = value;
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
return 0; return pNode->handle.valueInUse;
} else { } else {
pNode = pNode->next; return NULL;
} }
} else {
pNode = pNode->next;
} }
TqMetaList *pNewNode = malloc(sizeof(TqMetaList));
if(pNewNode == NULL) {
//TODO: memory error
return -1;
} }
memset(pNewNode, 0, sizeof(TqMetaList)); return NULL;
pNewNode->handle.key = key;
pNewNode->handle.valueInTxn = value;
pNewNode->next = pMeta->bucket[bucketKey];
pMeta->bucket[bucketKey] = pNewNode;
tqLinkUnpersist(pMeta, pNewNode);
return 0;
} }
int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* value) {
void *vmem = malloc(vsize);
if(vmem == NULL) {
//TODO: memory error
return -1;
}
memcpy(vmem, value, vsize);
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
//TODO: think about thread safety //TODO: think about thread safety
if(pNode->handle.valueInTxn if(pNode->handle.valueInTxn) {
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if(TqDupIntxnReject(pMeta->tqConfigFlag)) {
return -2;
}
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->deleter(pNode->handle.valueInTxn);
} }
//change pointer ownership }
pNode->handle.valueInTxn = vmem; pNode->handle.valueInTxn = value;
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
} else { } else {
...@@ -526,13 +513,27 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi ...@@ -526,13 +513,27 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
} }
memset(pNewNode, 0, sizeof(TqMetaList)); memset(pNewNode, 0, sizeof(TqMetaList));
pNewNode->handle.key = key; pNewNode->handle.key = key;
pNewNode->handle.valueInTxn = vmem; pNewNode->handle.valueInTxn = value;
pNewNode->next = pMeta->bucket[bucketKey]; pNewNode->next = pMeta->bucket[bucketKey];
pMeta->bucket[bucketKey] = pNewNode; pMeta->bucket[bucketKey] = pNewNode;
tqLinkUnpersist(pMeta, pNewNode); tqLinkUnpersist(pMeta, pNewNode);
return 0; return 0;
} }
int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) {
return tqHandlePutImpl(pMeta, key, value);
}
int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
void *vmem = malloc(vsize);
if(vmem == NULL) {
//TODO: memory error
return -1;
}
memcpy(vmem, value, vsize);
return tqHandlePutImpl(pMeta, key, vmem);
}
static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
......
...@@ -32,13 +32,15 @@ void FooDeleter(void* pObj) { ...@@ -32,13 +32,15 @@ void FooDeleter(void* pObj) {
free(pObj); free(pObj);
} }
class TqMetaTest : public ::testing::Test { class TqMetaUpdateAppendTest : public ::testing::Test {
protected: protected:
void SetUp() override { void SetUp() override {
taosRemoveDir(pathName); taosRemoveDir(pathName);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter); FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
} }
...@@ -50,7 +52,7 @@ class TqMetaTest : public ::testing::Test { ...@@ -50,7 +52,7 @@ class TqMetaTest : public ::testing::Test {
const char* pathName = "/tmp/tq_test"; const char* pathName = "/tmp/tq_test";
}; };
TEST_F(TqMetaTest, copyPutTest) { TEST_F(TqMetaUpdateAppendTest, copyPutTest) {
Foo foo; Foo foo;
foo.a = 3; foo.a = 3;
tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo)); tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo));
...@@ -63,7 +65,7 @@ TEST_F(TqMetaTest, copyPutTest) { ...@@ -63,7 +65,7 @@ TEST_F(TqMetaTest, copyPutTest) {
EXPECT_EQ(pFoo->a, 3); EXPECT_EQ(pFoo->a, 3);
} }
TEST_F(TqMetaTest, persistTest) { TEST_F(TqMetaUpdateAppendTest, persistTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo)); Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 2; pFoo->a = 2;
tqHandleMovePut(pMeta, 1, pFoo); tqHandleMovePut(pMeta, 1, pFoo);
...@@ -77,7 +79,9 @@ TEST_F(TqMetaTest, persistTest) { ...@@ -77,7 +79,9 @@ TEST_F(TqMetaTest, persistTest) {
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter); FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
pBar = (Foo*)tqHandleGet(pMeta, 1); pBar = (Foo*)tqHandleGet(pMeta, 1);
...@@ -88,7 +92,7 @@ TEST_F(TqMetaTest, persistTest) { ...@@ -88,7 +92,7 @@ TEST_F(TqMetaTest, persistTest) {
EXPECT_EQ(pBar == NULL, true); EXPECT_EQ(pBar == NULL, true);
} }
TEST_F(TqMetaTest, uncommittedTest) { TEST_F(TqMetaUpdateAppendTest, uncommittedTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo)); Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3; pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo); tqHandleMovePut(pMeta, 1, pFoo);
...@@ -97,7 +101,7 @@ TEST_F(TqMetaTest, uncommittedTest) { ...@@ -97,7 +101,7 @@ TEST_F(TqMetaTest, uncommittedTest) {
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
} }
TEST_F(TqMetaTest, abortTest) { TEST_F(TqMetaUpdateAppendTest, abortTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo)); Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3; pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo); tqHandleMovePut(pMeta, 1, pFoo);
...@@ -110,7 +114,7 @@ TEST_F(TqMetaTest, abortTest) { ...@@ -110,7 +114,7 @@ TEST_F(TqMetaTest, abortTest) {
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
} }
TEST_F(TqMetaTest, deleteTest) { TEST_F(TqMetaUpdateAppendTest, deleteTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo)); Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3; pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo); tqHandleMovePut(pMeta, 1, pFoo);
...@@ -135,14 +139,16 @@ TEST_F(TqMetaTest, deleteTest) { ...@@ -135,14 +139,16 @@ TEST_F(TqMetaTest, deleteTest) {
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter); FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
} }
TEST_F(TqMetaTest, intxnPersist) { TEST_F(TqMetaUpdateAppendTest, intxnPersist) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo)); Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3; pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo); tqHandleMovePut(pMeta, 1, pFoo);
...@@ -157,7 +163,9 @@ TEST_F(TqMetaTest, intxnPersist) { ...@@ -157,7 +163,9 @@ TEST_F(TqMetaTest, intxnPersist) {
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter); FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1); pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
...@@ -170,14 +178,16 @@ TEST_F(TqMetaTest, intxnPersist) { ...@@ -170,14 +178,16 @@ TEST_F(TqMetaTest, intxnPersist) {
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter); FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1); pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo1->a, 4); EXPECT_EQ(pFoo1->a, 4);
} }
TEST_F(TqMetaTest, multiplePage) { TEST_F(TqMetaUpdateAppendTest, multiplePage) {
srand(0); srand(0);
std::vector<int> v; std::vector<int> v;
for(int i = 0; i < 1000; i++) { for(int i = 0; i < 1000; i++) {
...@@ -195,7 +205,9 @@ TEST_F(TqMetaTest, multiplePage) { ...@@ -195,7 +205,9 @@ TEST_F(TqMetaTest, multiplePage) {
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter); FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
for(int i = 500; i < 1000; i++) { for(int i = 500; i < 1000; i++) {
...@@ -213,7 +225,7 @@ TEST_F(TqMetaTest, multiplePage) { ...@@ -213,7 +225,7 @@ TEST_F(TqMetaTest, multiplePage) {
} }
TEST_F(TqMetaTest, multipleRewrite) { TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
srand(0); srand(0);
std::vector<int> v; std::vector<int> v;
for(int i = 0; i < 1000; i++) { for(int i = 0; i < 1000; i++) {
...@@ -244,7 +256,9 @@ TEST_F(TqMetaTest, multipleRewrite) { ...@@ -244,7 +256,9 @@ TEST_F(TqMetaTest, multipleRewrite) {
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter); FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
for(int i = 500; i < 1000; i++) { for(int i = 500; i < 1000; i++) {
...@@ -263,7 +277,7 @@ TEST_F(TqMetaTest, multipleRewrite) { ...@@ -263,7 +277,7 @@ TEST_F(TqMetaTest, multipleRewrite) {
} }
TEST_F(TqMetaTest, dupCommit) { TEST_F(TqMetaUpdateAppendTest, dupCommit) {
srand(0); srand(0);
std::vector<int> v; std::vector<int> v;
for(int i = 0; i < 1000; i++) { for(int i = 0; i < 1000; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册