提交 c78f3e35 编写于 作者: H Hongze Cheng

refact vnode code

上级 e990ff5a
add_subdirectory(meta) aux_source_directory(src/meta META_SRC)
add_subdirectory(tq) aux_source_directory(src/tq TQ_SRC)
add_subdirectory(tsdb) aux_source_directory(src/tsdb TSDB_SRC)
add_subdirectory(impl) aux_source_directory(src/vnd VND_SRC)
\ No newline at end of file list(APPEND
VNODE_SRC
${META_SRC}
${TQ_SRC}
${TSDB_SRC}
${VND_SRC}
)
add_library(vnode STATIC ${VNODE_SRC})
target_include_directories(
vnode
PUBLIC inc
PRIVATE src/inc
)
target_link_libraries(
vnode
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC transport
PUBLIC bdb
PUBLIC tfs
PUBLIC wal
PUBLIC qworker
)
if(${BUILD_TEST})
# add_subdirectory(test)
endif(${BUILD_TEST})
# Vnode API test
add_executable(vnodeApiTests "")
target_sources(vnodeApiTests
PRIVATE
"vnodeApiTests.cpp"
)
target_link_libraries(vnodeApiTests vnode gtest gtest_main)
add_test(
NAME vnode_api_tests
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/vnodeApiTests
)
\ No newline at end of file
// https://stackoverflow.com/questions/8565666/benchmarking-with-googletest
// https://github.com/google/benchmark
\ No newline at end of file
/**
* @file vnodeApiTests.cpp
* @author hzcheng (hzcheng@taosdata.com)
* @brief VNODE module API tests
* @version 0.1
* @date 2021-12-13
*
* @copyright Copyright (c) 2021
*
*/
#include <gtest/gtest.h>
#include <iostream>
#include "vnode.h"
static STSchema *vtCreateBasicSchema() {
STSchemaBuilder sb;
STSchema * pSchema = NULL;
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
for (int i = 1; i < 10; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_INT, i, 0);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static STSchema *vtCreateBasicTagSchema() {
STSchemaBuilder sb;
STSchema * pSchema = NULL;
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
for (int i = 10; i < 12; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_BINARY, i, 20);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static SKVRow vtCreateBasicTag() {
SKVRowBuilder rb;
SKVRow pTag;
tdInitKVRowBuilder(&rb);
for (int i = 0; i < 2; i++) {
void *pVal = malloc(sizeof(VarDataLenT) + strlen("foo"));
varDataLen(pVal) = strlen("foo");
memcpy(varDataVal(pVal), "foo", strlen("foo"));
tdAddColToKVRow(&rb, i, TSDB_DATA_TYPE_BINARY, pVal);
free(pVal);
}
pTag = tdGetKVRowFromBuilder(&rb);
tdDestroyKVRowBuilder(&rb);
return pTag;
}
static void vtBuildCreateStbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
SRpcMsg * pMsg;
STSchema *pSchema;
STSchema *pTagSchema;
int zs;
void * pBuf;
pSchema = vtCreateBasicSchema();
pTagSchema = vtCreateBasicTagSchema();
SVnodeReq vCreateSTbReq;
vnodeSetCreateStbReq(&vCreateSTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
zs = vnodeBuildReq(NULL, &vCreateSTbReq, TDMT_VND_CREATE_STB);
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
pMsg->msgType = TDMT_VND_CREATE_STB;
pMsg->contLen = zs;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg));
pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateSTbReq, TDMT_VND_CREATE_STB);
META_CLEAR_TB_CFG(&vCreateSTbReq);
tdFreeSchema(pSchema);
tdFreeSchema(pTagSchema);
*ppMsg = pMsg;
}
static void vtBuildCreateCtbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
SRpcMsg *pMsg;
int tz;
SKVRow pTag = vtCreateBasicTag();
SVnodeReq vCreateCTbReq;
vnodeSetCreateCtbReq(&vCreateCTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pTag);
tz = vnodeBuildReq(NULL, &vCreateCTbReq, TDMT_VND_CREATE_TABLE);
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz);
pMsg->msgType = TDMT_VND_CREATE_TABLE;
pMsg->contLen = tz;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
void *pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateCTbReq, TDMT_VND_CREATE_TABLE);
META_CLEAR_TB_CFG(&vCreateCTbReq);
free(pTag);
*ppMsg = pMsg;
}
static void vtBuildCreateNtbReq(char *tbname, SRpcMsg **ppMsg) {
// TODO
}
static void vtBuildSubmitReq(SRpcMsg **ppMsg) {
SRpcMsg * pMsg;
SSubmitMsg *pSubmitMsg;
SSubmitBlk *pSubmitBlk;
int tz = 1024; // TODO
pMsg = (SRpcMsg *)malloc(sizeof(*pMsg) + tz);
pMsg->msgType = TDMT_VND_SUBMIT;
pMsg->contLen = tz;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
// For submit msg header
pSubmitMsg = (SSubmitMsg *)(pMsg->pCont);
// pSubmitMsg->header.contLen = 0;
// pSubmitMsg->header.vgId = 0;
// pSubmitMsg->length = 0;
pSubmitMsg->numOfBlocks = 1;
// For submit blk
pSubmitBlk = (SSubmitBlk *)(pSubmitMsg->blocks);
pSubmitBlk->uid = 0;
pSubmitBlk->tid = 0;
pSubmitBlk->padding = 0;
pSubmitBlk->sversion = 0;
pSubmitBlk->dataLen = 0;
pSubmitBlk->numOfRows = 0;
// For row batch
*ppMsg = pMsg;
}
static void vtClearMsgBatch(SArray *pMsgArr) {
SRpcMsg *pMsg;
for (size_t i = 0; i < taosArrayGetSize(pMsgArr); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgArr, i);
free(pMsg);
}
taosArrayClear(pMsgArr);
}
static void vtProcessAndApplyReqs(SVnode *pVnode, SArray *pMsgArr) {
int rcode;
SRpcMsg *pReq;
SRpcMsg *pRsp;
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
for (size_t i = 0; i < taosArrayGetSize(pMsgArr); i++) {
pReq = *(SRpcMsg **)taosArrayGet(pMsgArr, i);
rcode = vnodeApplyWMsg(pVnode, pReq, NULL);
GTEST_ASSERT_EQ(rcode, 0);
}
}
TEST(vnodeApiTest, vnode_simple_create_table_test) {
tb_uid_t suid = 1638166374163;
SRpcMsg *pMsg;
SArray * pMsgArr = NULL;
SVnode * pVnode;
int rcode;
int ntables = 1000000;
int batch = 10;
char tbname[128];
pMsgArr = (SArray *)taosArrayInit(batch, sizeof(pMsg));
vnodeDestroy("vnode1");
GTEST_ASSERT_GE(vnodeInit(2), 0);
// CREATE AND OPEN A VNODE
pVnode = vnodeOpen("vnode1", NULL);
ASSERT_NE(pVnode, nullptr);
// CREATE A SUPER TABLE
sprintf(tbname, "st");
vtBuildCreateStbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// CREATE A LOT OF CHILD TABLES
for (int i = 0; i < ntables / batch; i++) {
// Build request batch
for (int j = 0; j < batch; j++) {
sprintf(tbname, "ct%d", i * batch + j + 1);
vtBuildCreateCtbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
}
// Process request batch
vtProcessAndApplyReqs(pVnode, pMsgArr);
// Clear request batch
vtClearMsgBatch(pMsgArr);
}
// CLOSE THE VNODE
vnodeClose(pVnode);
vnodeCleanup();
taosArrayDestroy(pMsgArr);
}
TEST(vnodeApiTest, vnode_simple_insert_test) {
const char *vname = "vnode2";
char tbname[128];
tb_uid_t suid = 1638166374163;
SRpcMsg * pMsg;
SArray * pMsgArr;
int rcode;
SVnode * pVnode;
int batch = 1;
int loop = 1000000;
pMsgArr = (SArray *)taosArrayInit(0, sizeof(pMsg));
vnodeDestroy(vname);
GTEST_ASSERT_GE(vnodeInit(2), 0);
// Open a vnode
pVnode = vnodeOpen(vname, NULL);
GTEST_ASSERT_NE(pVnode, nullptr);
// 1. CREATE A SUPER TABLE
sprintf(tbname, "st");
vtBuildCreateStbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// 2. CREATE A CHILD TABLE
sprintf(tbname, "t0");
vtBuildCreateCtbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// 3. WRITE A LOT OF TIME-SERIES DATA
for (int j = 0; j < loop; j++) {
for (int i = 0; i < batch; i++) {
vtBuildSubmitReq(&pMsg);
taosArrayPush(pMsgArr, &pMsg);
}
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
}
// Close the vnode
vnodeClose(pVnode);
vnodeCleanup();
taosArrayDestroy(pMsgArr);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "metaDef.h"
#include "sqlite3.h"
struct SMetaDB {
sqlite3 *pDB;
};
int metaOpenDB(SMeta *pMeta) {
char dir[128];
int rc;
char *err = NULL;
pMeta->pDB = (SMetaDB *)calloc(1, sizeof(SMetaDB));
if (pMeta->pDB == NULL) {
// TODO: handle error
return -1;
}
sprintf(dir, "%s/meta.db", pMeta->path);
rc = sqlite3_open(dir, &(pMeta->pDB->pDB));
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to open meta.db\n");
}
// For all tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS tb ("
" tbname VARCHAR(256) NOT NULL UNIQUE,"
" tb_uid INTEGER NOT NULL UNIQUE "
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table tb since %s\n", err);
}
// For super tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS stb ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256) NOT NULL UNIQUE,"
" tb_schema BLOB NOT NULL,"
" tag_schema BLOB NOT NULL"
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table stb since %s\n", err);
}
// For normal tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS ntb ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256) NOT NULL,"
" tb_schema BLOB NOT NULL"
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table ntb since %s\n", err);
}
sqlite3_exec(pMeta->pDB->pDB, "BEGIN;", NULL, NULL, &err);
tfree(err);
return 0;
}
void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) {
sqlite3_exec(pMeta->pDB->pDB, "COMMIT;", NULL, NULL, NULL);
sqlite3_close(pMeta->pDB->pDB);
free(pMeta->pDB);
pMeta->pDB = NULL;
}
// TODO
}
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbCfg) {
char sql[256];
char * err = NULL;
int rc;
tb_uid_t uid;
sqlite3_stmt *stmt;
char buf[256];
void * pBuf;
switch (pTbCfg->type) {
case META_SUPER_TABLE:
uid = pTbCfg->stbCfg.suid;
sprintf(sql,
"INSERT INTO tb VALUES (\'%s\', %" PRIu64
");"
"CREATE TABLE IF NOT EXISTS stb_%" PRIu64
" ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256),"
" tag1 INTEGER);",
pTbCfg->name, uid, uid);
rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
if (rc != SQLITE_OK) {
printf("failed to create normal table since %s\n", err);
}
sprintf(sql, "INSERT INTO stb VALUES (%" PRIu64 ", %s, ?, ?)", uid, pTbCfg->name);
sqlite3_prepare_v2(pMeta->pDB->pDB, sql, -1, &stmt, NULL);
pBuf = buf;
tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pSchema);
sqlite3_bind_blob(stmt, 1, buf, POINTER_DISTANCE(pBuf, buf), NULL);
pBuf = buf;
tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pTagSchema);
sqlite3_bind_blob(stmt, 2, buf, POINTER_DISTANCE(pBuf, buf), NULL);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
#if 0
sprintf(sql,
"INSERT INTO tb VALUES (?, ?);"
// "INSERT INTO stb VALUES (?, ?, ?, ?);"
// "CREATE TABLE IF NOT EXISTS stb_%" PRIu64
// " ("
// " tb_uid INTEGER NOT NULL UNIQUE,"
// " tbname VARCHAR(256),"
// " tag1 INTEGER);"
,
uid);
rc = sqlite3_prepare_v2(pMeta->pDB->pDB, sql, -1, &stmt, NULL);
if (rc != SQLITE_OK) {
return -1;
}
sqlite3_bind_text(stmt, 1, pTbCfg->name, -1, SQLITE_TRANSIENT);
sqlite3_bind_int64(stmt, 2, uid);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
// sqlite3_bind_int64(stmt, 3, uid);
// sqlite3_bind_text(stmt, 4, pTbCfg->name, -1, SQLITE_TRANSIENT);
// pBuf = buf;
// tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pSchema);
// sqlite3_bind_blob(stmt, 5, buf, POINTER_DISTANCE(pBuf, buf), NULL);
// pBuf = buf;
// tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pTagSchema);
// sqlite3_bind_blob(stmt, 6, buf, POINTER_DISTANCE(pBuf, buf), NULL);
rc = sqliteVjj3_step(stmt);
if (rc != SQLITE_OK) {
printf("failed to create normal table since %s\n", sqlite3_errmsg(pMeta->pDB->pDB));
}
sqlite3_finalize(stmt);
#endif
break;
case META_NORMAL_TABLE:
// uid = metaGenerateUid(pMeta);
// sprintf(sql,
// "INSERT INTO tb VALUES (\'%s\', %" PRIu64
// ");"
// "INSERT INTO ntb VALUES (%" PRIu64 ", \'%s\', );",
// pTbCfg->name, uid, uid, pTbCfg->name, );
// rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
// if (rc != SQLITE_OK) {
// printf("failed to create normal table since %s\n", err);
// }
break;
case META_CHILD_TABLE:
#if 0
uid = metaGenerateUid(pMeta);
// sprintf(sql, "INSERT INTO tb VALUES (\'%s\', %" PRIu64
// ");"
// "INSERT INTO stb_%" PRIu64 " VALUES (%" PRIu64 ", \'%s\', );");
rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
if (rc != SQLITE_OK) {
printf("failed to create child table since %s\n", err);
}
#endif
break;
default:
break;
}
tfree(err);
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
/* TODO */
return 0;
}
\ No newline at end of file
# add_executable(metaTest "")
# target_sources(metaTest
# PRIVATE
# "../src/metaMain.c"
# "../src/metaUid.c"
# "metaTests.cpp"
# )
# target_include_directories(metaTest
# PUBLIC
# "${CMAKE_SOURCE_DIR}/include/server/vnode/meta"
# "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
# )
# target_link_libraries(metaTest
# os
# util
# common
# gtest_main
# tkv
# )
# enable_testing()
# add_test(
# NAME meta_test
# COMMAND metaTest
# )
#if 0
#include <gtest/gtest.h>
#include <string.h>
#include <iostream>
#include "meta.h"
static STSchema *metaGetSimpleSchema() {
STSchema * pSchema = NULL;
STSchemaBuilder sb = {0};
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_INT, 1, 4);
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static SKVRow metaGetSimpleTags() {
SKVRowBuilder kvrb = {0};
SKVRow row;
tdInitKVRowBuilder(&kvrb);
int64_t ts = 1634287978000;
int32_t a = 10;
tdAddColToKVRow(&kvrb, 0, TSDB_DATA_TYPE_TIMESTAMP, (void *)(&ts));
tdAddColToKVRow(&kvrb, 0, TSDB_DATA_TYPE_INT, (void *)(&a));
row = tdGetKVRowFromBuilder(&kvrb);
tdDestroyKVRowBuilder(&kvrb);
return row;
}
TEST(MetaTest, DISABLED_meta_create_1m_normal_tables_test) {
// Open Meta
SMeta *meta = metaOpen(NULL, NULL);
std::cout << "Meta is opened!" << std::endl;
// Create 1000000 normal tables
META_TABLE_OPTS_DECLARE(tbOpts);
STSchema *pSchema = metaGetSimpleSchema();
char tbname[128];
for (size_t i = 0; i < 1000000; i++) {
sprintf(tbname, "ntb%ld", i);
metaNormalTableOptsInit(&tbOpts, tbname, pSchema);
metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
}
tdFreeSchema(pSchema);
// Close Meta
metaClose(meta);
std::cout << "Meta is closed!" << std::endl;
// Destroy Meta
metaDestroy("meta");
std::cout << "Meta is destroyed!" << std::endl;
}
TEST(MetaTest, meta_create_1m_child_tables_test) {
// Open Meta
SMeta *meta = metaOpen(NULL);
std::cout << "Meta is opened!" << std::endl;
// Create a super tables
tb_uid_t uid = 477529885843758ul;
META_TABLE_OPTS_DECLARE(tbOpts);
STSchema *pSchema = metaGetSimpleSchema();
STSchema *pTagSchema = metaGetSimpleSchema();
metaSuperTableOptsInit(&tbOpts, "st", uid, pSchema, pTagSchema);
metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
tdFreeSchema(pSchema);
tdFreeSchema(pTagSchema);
// Create 1000000 child tables
char name[128];
SKVRow row = metaGetSimpleTags();
for (size_t i = 0; i < 1000000; i++) {
sprintf(name, "ctb%ld", i);
metaChildTableOptsInit(&tbOpts, name, uid, row);
metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
}
kvRowFree(row);
// Close Meta
metaClose(meta);
std::cout << "Meta is closed!" << std::endl;
// Destroy Meta
metaDestroy("meta");
std::cout << "Meta is destroyed!" << std::endl;
}
#endif
\ No newline at end of file
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#define _TD_VNODE_DEF_H_ #define _TD_VNODE_DEF_H_
#include "mallocator.h" #include "mallocator.h"
#include "sync.h" // #include "sync.h"
#include "tcoding.h" #include "tcoding.h"
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "vnode.h" #include "vnode.h"
#include "meta.h" #include "meta.h"
#include "sync.h" // #include "sync.h"
#include "tlog.h" #include "tlog.h"
#include "tq.h" #include "tq.h"
#include "tsdb.h" #include "tsdb.h"
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#ifndef _TD_VNODE_SYNC_H_ #ifndef _TD_VNODE_SYNC_H_
#define _TD_VNODE_SYNC_H_ #define _TD_VNODE_SYNC_H_
#include "sync.h" // #include "sync.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifdef USE_INVERTED_INDEX
#include "index.h" #include "index.h"
#endif
#include "metaDef.h" #include "metaDef.h"
struct SMetaIdx { struct SMetaIdx {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// #include "os.h"
// #include "tmsg.h"
// #include "tarray.h"
// #include "query.h"
// #include "tglobal.h"
// #include "tlist.h"
// #include "tsdbint.h"
// #include "tsdbBuffer.h"
// #include "tsdbLog.h"
// #include "tsdbHealth.h"
// #include "ttimer.h"
// #include "tthread.h"
// // return malloc new block count
// int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
// STsdbBufPool *pPool = pRepo->pPool;
// int32_t cnt = 0;
// if(tsdbAllowNewBlock(pRepo)) {
// STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
// if (pBufBlock) {
// if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
// // append error
// tsdbFreeBufBlock(pBufBlock);
// } else {
// pPool->nElasticBlocks ++;
// cnt ++ ;
// }
// }
// }
// return cnt;
// }
// // switch anther thread to run
// void* cbKillQueryFree(void* param) {
// STsdbRepo* pRepo = (STsdbRepo*)param;
// // vnode
// if(pRepo->appH.notifyStatus) {
// pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
// }
// // free
// if(pRepo->pthread){
// void* p = pRepo->pthread;
// pRepo->pthread = NULL;
// free(p);
// }
// return NULL;
// }
// // return true do free , false do nothing
// bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
// // check previous running
// if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
// tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
// return false;
// }
// // create new
// pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
// if(pRepo->pthread == NULL) {
// tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
// return false;
// }
// return true;
// }
// bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
// int32_t nMaxElastic = pRepo->config.totalBlocks/3;
// STsdbBufPool* pPool = pRepo->pPool;
// if(pPool->nElasticBlocks >= nMaxElastic) {
// tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic);
// return false;
// }
// return true;
// }
// bool tsdbNoProblem(STsdbRepo* pRepo) {
// if(listNEles(pRepo->pPool->bufBlockList) == 0)
// return false;
// return true;
// }
\ No newline at end of file
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include "taosdef.h" #include "taosdef.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tsdbint.h" #include "tsdbDef.h"
#include "tmsg.h" #include "tmsg.h"
#define EXTRA_BYTES 2 #define EXTRA_BYTES 2
......
...@@ -13,18 +13,18 @@ ...@@ -13,18 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdbRowMergeBuf.h" // #include "tsdbRowMergeBuf.h"
#include "tdataformat.h" // #include "tdataformat.h"
// row1 has higher priority // // row1 has higher priority
SMemRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) { // SMemRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
if(row2 == NULL) return row1; // if(row2 == NULL) return row1;
if(row1 == NULL) return row2; // if(row1 == NULL) return row2;
ASSERT(pSchema1->version == memRowVersion(row1)); // ASSERT(pSchema1->version == memRowVersion(row1));
ASSERT(pSchema2->version == memRowVersion(row2)); // ASSERT(pSchema2->version == memRowVersion(row2));
if(tsdbMergeBufMakeSureRoom(pBuf, pSchema1, pSchema2) < 0) { // if(tsdbMergeBufMakeSureRoom(pBuf, pSchema1, pSchema2) < 0) {
return NULL; // return NULL;
} // }
return mergeTwoMemRows(*pBuf, row1, row2, pSchema1, pSchema2); // return mergeTwoMemRows(*pBuf, row1, row2, pSchema1, pSchema2);
} // }
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdbint.h"
#if 0 #if 0
#include "tsdbint.h"
#ifndef _TSDB_PLUGINS #ifndef _TSDB_PLUGINS
int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; } int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_INT_H_
#define _TD_TSDB_INT_H_
#if 0
// // TODO: remove the include
// #include <errno.h>
// #include <fcntl.h>
// #include <limits.h>
// #include <inttypes.h>
// #include <sys/stat.h>
// #include <sys/types.h>
// #include <semaphore.h>
// #include <dirent.h>
#include "hash.h"
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tarray.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tcompression.h"
#include "tdataformat.h"
#include "tfs.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tskiplist.h"
#include "tsocket.h"
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
#endif
// Log
#include "tsdbLog.h"
// Meta
#include "tsdbMeta.h"
// Buffer
#include "tsdbBuffer.h"
// MemTable
#include "tsdbMemTable.h"
// File
#include "tsdbFile.h"
// FS
#include "tsdbFS.h"
// ReadImpl
#include "tsdbReadImpl.h"
// Commit
#include "tsdbCommit.h"
// Compact
#include "tsdbCompact.h"
// Commit Queue
#include "tsdbCommitQueue.h"
#include "tsdbRowMergeBuf.h"
// Main definitions
struct STsdbRepo {
uint8_t state;
STsdbCfg config;
STsdbCfg save_config; // save apply config
bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config
uint8_t hasCachedLastColumn;
STsdbAppH appH;
STsdbStat stat;
STsdbMeta* tsdbMeta;
STsdbBufPool* pPool;
SMemTable* mem;
SMemTable* imem;
STsdbFS* fs;
SRtn rtn;
tsem_t readyToCommit;
pthread_mutex_t mutex;
bool repoLocked;
int32_t code; // Commit code
SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
pthread_t* pthread;
};
#define REPO_ID(r) (r)->config.tsdbId
#define REPO_CFG(r) (&((r)->config))
#define REPO_FS(r) ((r)->fs)
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo);
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]);
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL;
SListNode* pNode = listTail(pRepo->mem->bufBlockList);
if (pNode == NULL) return NULL;
STsdbBufBlock* pBufBlock = NULL;
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock));
return pBufBlock;
}
static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
int maxTables = TSDB_INIT_NTABLES;
while (true) {
maxTables = MIN(maxTables, TSDB_MAX_TABLES);
if (tid <= maxTables) break;
maxTables *= 2;
}
return maxTables + 1;
}
#ifdef __cplusplus
}
#endif
#endif
#endif /* _TD_TSDB_INT_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tmsg.h"
#include "tarray.h"
#include "query.h"
#include "tglobal.h"
#include "tlist.h"
#include "tsdbint.h"
#include "tsdbBuffer.h"
#include "tsdbLog.h"
#include "tsdbHealth.h"
#include "ttimer.h"
#include "tthread.h"
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
int32_t cnt = 0;
if(tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) {
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
// append error
tsdbFreeBufBlock(pBufBlock);
} else {
pPool->nElasticBlocks ++;
cnt ++ ;
}
}
}
return cnt;
}
// switch anther thread to run
void* cbKillQueryFree(void* param) {
STsdbRepo* pRepo = (STsdbRepo*)param;
// vnode
if(pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
}
// free
if(pRepo->pthread){
void* p = pRepo->pthread;
pRepo->pthread = NULL;
free(p);
}
return NULL;
}
// return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
// check previous running
if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
return false;
}
// create new
pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
if(pRepo->pthread == NULL) {
tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
return false;
}
return true;
}
bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
int32_t nMaxElastic = pRepo->config.totalBlocks/3;
STsdbBufPool* pPool = pRepo->pPool;
if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic);
return false;
}
return true;
}
bool tsdbNoProblem(STsdbRepo* pRepo) {
if(listNEles(pRepo->pPool->bufBlockList) == 0)
return false;
return true;
}
\ No newline at end of file
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册