提交 9c3732a1 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/feature/vnode' into feature/3.0_liaohj

......@@ -7,3 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT}
# [Optional] Uncomment this section to install additional packages.
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
# && apt-get -y install --no-install-recommends <your-package-list-here>
RUN apt-get update && apt-get -y install tree vim
......@@ -5,5 +5,4 @@ add_subdirectory(bnode)
add_subdirectory(snode)
add_subdirectory(mnode)
add_subdirectory(vnode)
add_subdirectory(stb)
add_subdirectory(sut)
aux_source_directory(. DSTB_SRC)
add_executable(dnode_test_stb ${DSTB_SRC})
target_link_libraries(
dnode_test_stb
PUBLIC sut
)
add_test(
NAME dnode_test_stb
COMMAND dnode_test_stb
)
/**
* @file db.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module vnode tests
* @version 0.1
* @date 2021-12-20
*
* @copyright Copyright (c) 2021
*
*/
#include "sut.h"
class DndTestVnode : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_stb", 9116); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
};
Testbase DndTestVnode::test;
TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) {
{
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SCreateVnodeReq);
SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(1);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9527);
}
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED);
}
}
}
{
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SAlterVnodeReq);
SAlterVnodeReq* pReq = (SAlterVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(2);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9527);
}
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
}
{
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SDropVnodeReq);
SDropVnodeReq* pReq = (SDropVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropVnodeReq);
rpcMsg.msgType = TDMT_DND_DROP_VNODE;
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_NOT_DEPLOYED);
}
}
}
}
......@@ -25,115 +25,216 @@ class DndTestVnode : public ::testing::Test {
Testbase DndTestVnode::test;
TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) {
{
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SCreateVnodeReq);
SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(1);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9527);
}
TEST_F(DndTestVnode, 01_Create_Vnode) {
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SCreateVnodeReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED);
}
SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(1);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9527);
}
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED);
}
}
}
TEST_F(DndTestVnode, 02_ALTER_Vnode) {
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SAlterVnodeReq);
SAlterVnodeReq* pReq = (SAlterVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(2);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9527);
}
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
}
TEST_F(DndTestVnode, 03_Create_Stb) {
for (int i = 0; i < 1; ++i) {
SVCreateTbReq req = {0};
req.ver = 0;
req.name = (char*)"stb1";
req.ttl = 0;
req.keep = 0;
req.type = TD_SUPER_TABLE;
SSchema schemas[5] = {0};
{
SSchema* pSchema = &schemas[0];
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema->name, "ts");
}
{
SSchema* pSchema = &schemas[1];
pSchema->bytes = htonl(4);
pSchema->type = TSDB_DATA_TYPE_INT;
strcpy(pSchema->name, "col1");
}
{
SSchema* pSchema = &schemas[2];
pSchema->bytes = htonl(2);
pSchema->type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema->name, "tag1");
}
{
SSchema* pSchema = &schemas[3];
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema->name, "tag2");
}
{
SSchema* pSchema = &schemas[4];
pSchema->bytes = htonl(16);
pSchema->type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "tag3");
}
req.stbCfg.suid = 9527;
req.stbCfg.nCols = 2;
req.stbCfg.pSchema = &schemas[0];
req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2];
int32_t bsize = tSerializeSVCreateTbReq(NULL, &req);
void* buf = rpcMallocCont(sizeof(SMsgHead) + bsize);
SMsgHead* pMsgHead = (SMsgHead*)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + bsize);
pMsgHead->vgId = htonl(2);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req);
int32_t contLen = sizeof(SMsgHead) + bsize;
SRpcMsg* pRsp = test.SendReq(TDMT_VND_CREATE_STB, buf, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_TDB_TABLE_ALREADY_EXIST);
}
}
}
TEST_F(DndTestVnode, 04_ALTER_Stb) {
#if 0
{
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SAlterVnodeReq);
SAlterVnodeReq* pReq = (SAlterVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(2);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9527);
}
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_VNODE, pReq, contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
}
#endif
}
TEST_F(DndTestVnode, 05_DROP_Stb) {
#if 0
{
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SDropVnodeReq);
SDropVnodeReq* pReq = (SDropVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropVnodeReq);
rpcMsg.msgType = TDMT_DND_DROP_VNODE;
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_NOT_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_TDB_INVALID_TABLE_ID);
}
}
}
#endif
}
TEST_F(DndTestVnode, 06_DROP_Vnode) {
for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SDropVnodeReq);
SDropVnodeReq* pReq = (SDropVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropVnodeReq);
rpcMsg.msgType = TDMT_DND_DROP_VNODE;
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_NOT_DEPLOYED);
}
}
}
\ No newline at end of file
......@@ -42,7 +42,8 @@ typedef struct {
SSchema *pSchema;
} SSchemaWrapper;
typedef struct SMTbCursor SMTbCursor;
typedef struct SMTbCursor SMTbCursor;
typedef struct SMCtbCursor SMCtbCursor;
typedef SVCreateTbReq STbCfg;
......@@ -64,6 +65,10 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
char * metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
char * metaCtbCursorNext(SMCtbCursor *pCtbCur);
// Options
void metaOptionsInit(SMetaCfg *pMetaCfg);
void metaOptionsClear(SMetaCfg *pMetaCfg);
......
......@@ -621,4 +621,59 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
tdDestroyTSchemaBuilder(&sb);
return pTSchema;
}
struct SMCtbCursor {
DBC * pCur;
tb_uid_t suid;
};
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
SMCtbCursor *pCtbCur = NULL;
SMetaDB * pDB = pMeta->pDB;
int ret;
pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur));
if (pCtbCur == NULL) {
return NULL;
}
pCtbCur->suid = uid;
ret = pDB->pCtbIdx->cursor(pDB->pCtbIdx, NULL, &(pCtbCur->pCur), 0);
if (ret != 0) {
free(pCtbCur);
return NULL;
}
return pCtbCur;
}
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
if (pCtbCur) {
if (pCtbCur->pCur) {
pCtbCur->pCur->close(pCtbCur->pCur);
}
free(pCtbCur);
}
}
char *metaCtbCursorNext(SMCtbCursor *pCtbCur) {
DBT skey = {0};
DBT pkey = {0};
DBT pval = {0};
void * pBuf;
STbCfg tbCfg;
// Set key
skey.data = &(pCtbCur->suid);
skey.size = sizeof(pCtbCur->suid);
if (pCtbCur->pCur->get(pCtbCur->pCur, &skey, &pval, DB_NEXT) == 0) {
pBuf = pval.data;
metaDecodeTbInfo(pBuf, &tbCfg);
return tbCfg.name;
} else {
return NULL;
}
}
\ No newline at end of file
......@@ -44,7 +44,7 @@ typedef struct {
// TDB Operations
TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type);
TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags);
TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags);
TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags);
#ifdef __cplusplus
......
......@@ -56,9 +56,28 @@ _err:
return 0;
}
TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags) {
// TODO
return 0;
TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags) {
int ret = 0;
if ((dbp->fname = strdup(fname)) == NULL) {
ret = -1;
return ret;
}
// Create the backup file if the file not exists
// Open the file as a sub-db or a master-db
if (dbname) {
if ((dbp->dbname = strdup(dbname)) == NULL) {
ret = -1;
return ret;
}
// TODO: Open the DB as a SUB-DB in this file
} else {
// TODO: Open the DB as a MASTER-DB in this file
}
return ret;
}
TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags) {
......
......@@ -25,6 +25,14 @@
extern "C" {
#endif
typedef struct {
// TODO
} TDB_MPOOL;
typedef struct {
int fd;
} TDB_FH;
struct TDB {
pgsize_t pageSize;
tdb_db_t type;
......@@ -35,6 +43,9 @@ struct TDB {
TDB_HASH * hash;
TDB_HEAP * heap;
} dbam; // db access method
TDB_FH * fhp; // The backup file handle
TDB_MPOOL *mph; // The memory pool handle
};
#ifdef __cplusplus
......
......@@ -6,9 +6,9 @@ TEST(tdb_api_test, tdb_create_open_close_db_test) {
int ret;
TDB *dbp;
tdbCreateDB(&dbp, TDB_BTREE_T);
// tdbCreateDB(&dbp, TDB_BTREE_T);
tdbOpenDB(dbp, 0);
// tdbOpenDB(dbp, 0);
tdbCloseDB(dbp, 0);
// tdbCloseDB(dbp, 0);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册