未验证 提交 13ecd2b7 编写于 作者: C Cary Xu 提交者: GitHub

Feature/td 11463 3.0 (#10562)

* Block-wise SMA extraction

* refactor the SBlock

* add method tsdbLoadBlockOffset

* set method tsdbLoadBlockOffset static

* refactor

* trigger CI

* minor change

* trigger CI

* add STSma defintion

* add STSma schema encode/decode

* restore

* code optimization
上级 a070f953
......@@ -1842,6 +1842,149 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
}
return buf;
}
typedef enum {
TD_TIME_UNIT_UNKNOWN = -1,
TD_TIME_UNIT_YEAR = 0,
TD_TIME_UNIT_SEASON = 1,
TD_TIME_UNIT_MONTH = 2,
TD_TIME_UNIT_WEEK = 3,
TD_TIME_UNIT_DAY = 4,
TD_TIME_UNIT_HOUR = 5,
TD_TIME_UNIT_MINUTE = 6,
TD_TIME_UNIT_SEC = 7,
TD_TIME_UNIT_MILLISEC = 8,
TD_TIME_UNIT_MICROSEC = 9,
TD_TIME_UNIT_NANOSEC = 10
} ETDTimeUnit;
typedef struct {
uint8_t version; // for compatibility
uint8_t intervalUnit;
uint8_t slidingUnit;
char indexName[TSDB_INDEX_NAME_LEN + 1];
col_id_t numOfColIds;
uint16_t numOfFuncIds;
uint64_t tableUid; // super/common table uid
int64_t interval;
int64_t sliding;
col_id_t* colIds; // N.B. sorted column ids
uint16_t* funcIds; // N.B. sorted sma function ids
} STSma; // Time-range-wise SMA
typedef struct {
uint32_t number;
STSma* tSma;
} STSmaWrapper;
static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) {
if (pSma) {
tfree(pSma->colIds);
tfree(pSma->funcIds);
if (releaseSelf) {
free(pSma);
}
}
}
static FORCE_INLINE void tdDestroyWrapper(STSmaWrapper* pSW) {
if (pSW && pSW->tSma) {
for (uint32_t i = 0; i < pSW->number; ++i) {
tdDestroyTSma(pSW->tSma + i, false);
}
tfree(pSW->tSma);
}
}
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
int32_t tlen = 0;
tlen += taosEncodeFixedU8(buf, pSma->version);
tlen += taosEncodeFixedU8(buf, pSma->intervalUnit);
tlen += taosEncodeFixedU8(buf, pSma->slidingUnit);
tlen += taosEncodeString(buf, pSma->indexName);
tlen += taosEncodeFixedU16(buf, pSma->numOfColIds);
tlen += taosEncodeFixedU16(buf, pSma->numOfFuncIds);
tlen += taosEncodeFixedU64(buf, pSma->tableUid);
tlen += taosEncodeFixedI64(buf, pSma->interval);
tlen += taosEncodeFixedI64(buf, pSma->sliding);
for (col_id_t i = 0; i < pSma->numOfColIds; ++i) {
tlen += taosEncodeFixedU16(buf, *(pSma->colIds + i));
}
for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) {
tlen += taosEncodeFixedU16(buf, *(pSma->funcIds + i));
}
return tlen;
}
static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->number);
for (uint32_t i = 0; i < pSW->number; ++i) {
tlen += tEncodeTSma(buf, pSW->tSma + i);
}
return tlen;
}
static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf = taosDecodeFixedU8(buf, &pSma->version);
buf = taosDecodeFixedU8(buf, &pSma->intervalUnit);
buf = taosDecodeFixedU8(buf, &pSma->slidingUnit);
buf = taosDecodeStringTo(buf, pSma->indexName);
buf = taosDecodeFixedU16(buf, &pSma->numOfColIds);
buf = taosDecodeFixedU16(buf, &pSma->numOfFuncIds);
buf = taosDecodeFixedU64(buf, &pSma->tableUid);
buf = taosDecodeFixedI64(buf, &pSma->interval);
buf = taosDecodeFixedI64(buf, &pSma->sliding);
if (pSma->numOfColIds > 0) {
pSma->colIds = (col_id_t*)calloc(pSma->numOfColIds, sizeof(STSma));
if (pSma->colIds == NULL) {
return NULL;
}
for (uint16_t i = 0; i < pSma->numOfColIds; ++i) {
buf = taosDecodeFixedU16(buf, pSma->colIds + i);
}
} else {
pSma->colIds = NULL;
}
if (pSma->numOfFuncIds > 0) {
pSma->funcIds = (uint16_t*)calloc(pSma->numOfFuncIds, sizeof(STSma));
if (pSma->funcIds == NULL) {
return NULL;
}
for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) {
buf = taosDecodeFixedU16(buf, pSma->funcIds + i);
}
} else {
pSma->funcIds = NULL;
}
return buf;
}
static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->number);
pSW->tSma = (STSma*)calloc(pSW->number, sizeof(STSma));
if (pSW->tSma == NULL) {
return NULL;
}
for (uint32_t i = 0; i < pSW->number; ++i) {
if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) {
for (uint32_t j = i; j >= 0; --i) {
tdDestroyTSma(pSW->tSma + j, false);
}
free(pSW->tSma);
return NULL;
}
}
return buf;
}
typedef struct {
int64_t uid;
......
......@@ -206,6 +206,7 @@ typedef enum ELogicConditionType {
#define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_INDEX_NAME_LEN 32
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
......
......@@ -54,5 +54,5 @@ elseif(${META_DB_IMPL} STREQUAL "TDB")
endif()
if(${BUILD_TEST})
# add_subdirectory(test)
add_subdirectory(test)
endif(${BUILD_TEST})
......@@ -16,6 +16,10 @@
#ifndef _TD_TSDB_COMMIT_H_
#define _TD_TSDB_COMMIT_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int minFid;
int midFid;
......@@ -66,4 +70,8 @@ int tsdbApplyRtn(STsdbRepo *pRepo);
#endif
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_COMMIT_H_ */
\ No newline at end of file
......@@ -18,6 +18,10 @@
#include "tsdbFile.h"
#ifdef __cplusplus
extern "C" {
#endif
// ================== TSDB global config
extern bool tsdbForceKeepFile;
......@@ -111,4 +115,8 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
return 0;
}
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_FS_H_ */
......@@ -19,6 +19,10 @@
#include "tchecksum.h"
#include "tfs.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
......@@ -410,4 +414,8 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
return true;
}
#ifdef __cplusplus
}
#endif
#endif /* _TS_TSDB_FILE_H_ */
\ No newline at end of file
......@@ -18,6 +18,10 @@
#include "tlog.h"
#ifdef __cplusplus
extern "C" {
#endif
extern int32_t tsdbDebugFlag;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
......@@ -27,4 +31,8 @@ extern int32_t tsdbDebugFlag;
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_LOG_H_ */
\ No newline at end of file
......@@ -16,6 +16,10 @@
#ifndef _TD_TSDB_MEMORY_H_
#define _TD_TSDB_MEMORY_H_
#ifdef __cplusplus
extern "C" {
#endif
static void * taosTMalloc(size_t size);
static void * taosTCalloc(size_t nmemb, size_t size);
static void * taosTRealloc(void *ptr, size_t size);
......@@ -70,5 +74,8 @@ static FORCE_INLINE void* taosTZfree(void* ptr) {
return NULL;
}
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_MEMORY_H_ */
\ No newline at end of file
......@@ -24,6 +24,10 @@
#include "tsdbMemory.h"
#include "tcommon.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SReadH SReadH;
typedef struct {
......@@ -244,4 +248,8 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
return 0;
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_READ_IMPL_H_*/
......@@ -38,6 +38,8 @@ struct SMetaDB {
// DB
DB *pTbDB;
DB *pSchemaDB;
DB *pSmaDB;
// IDX
DB *pNameIdx;
DB *pStbIdx;
......@@ -100,6 +102,11 @@ int metaOpenDB(SMeta *pMeta) {
return -1;
}
if (metaOpenBDBDb(&(pDB->pSmaDB), pDB->pEvn, "sma.db", false) < 0) {
metaCloseDB(pMeta);
return -1;
}
// Open Indices
if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) {
metaCloseDB(pMeta);
......@@ -130,6 +137,7 @@ void metaCloseDB(SMeta *pMeta) {
metaCloseBDBIdx(pMeta->pDB->pNtbIdx);
metaCloseBDBIdx(pMeta->pDB->pStbIdx);
metaCloseBDBIdx(pMeta->pDB->pNameIdx);
metaCloseBDBDb(pMeta->pDB->pSmaDB);
metaCloseBDBDb(pMeta->pDB->pSchemaDB);
metaCloseBDBDb(pMeta->pDB->pTbDB);
metaCloseBDBEnv(pMeta->pDB->pEvn);
......
......@@ -41,10 +41,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
return 0;
}
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq;
SVCreateTbBatchReq vCreateTbBatchReq;
void *ptr = NULL;
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
void *ptr = NULL;
if (pVnode->config.streamMode == 0) {
ptr = vnodeMalloc(pVnode, pMsg->contLen);
......@@ -64,7 +62,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
switch (pMsg->msgType) {
case TDMT_VND_CREATE_STB:
case TDMT_VND_CREATE_STB: {
SVCreateTbReq vCreateTbReq = {0};
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO: handle error
......@@ -75,7 +74,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
free(vCreateTbReq.stbCfg.pTagSchema);
free(vCreateTbReq.name);
break;
case TDMT_VND_CREATE_TABLE:
}
case TDMT_VND_CREATE_TABLE: {
SVCreateTbBatchReq vCreateTbBatchReq = {0};
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
......@@ -97,14 +98,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
taosArrayDestroy(vCreateTbBatchReq.pArray);
break;
case TDMT_VND_ALTER_STB:
}
case TDMT_VND_ALTER_STB: {
SVCreateTbReq vAlterTbReq = {0};
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
free(vCreateTbReq.stbCfg.pSchema);
free(vCreateTbReq.stbCfg.pTagSchema);
free(vCreateTbReq.name);
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
free(vAlterTbReq.stbCfg.pSchema);
free(vAlterTbReq.stbCfg.pTagSchema);
free(vAlterTbReq.name);
break;
}
case TDMT_VND_DROP_STB:
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
break;
......
add_executable(tqTest "")
target_sources(tqTest
PRIVATE
"tqMetaTest.cpp"
)
target_include_directories(tqTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
MESSAGE(STATUS "vnode unit test")
target_link_libraries(tqTest
tq
gtest_main
)
enable_testing()
add_test(
NAME tq_test
COMMAND tqTest
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# add_executable(tqTest "")
# target_sources(tqTest
# PRIVATE
# "tqMetaTest.cpp"
# )
# target_include_directories(tqTest
# PUBLIC
# "${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
# "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
# )
# target_link_libraries(tqTest
# tq
# gtest_main
# )
# enable_testing()
# add_test(
# NAME tq_test
# COMMAND tqTest
# )
ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp)
TARGET_LINK_LIBRARIES(
tsdbSmaTest
PUBLIC os util common vnode gtest_main
)
TARGET_INCLUDE_DIRECTORIES(
tsdbSmaTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include <tmsg.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, tSmaEncodeDecodeTest) {
// encode
STSma tSma = {0};
tSma.version = 0;
tSma.intervalUnit = TD_TIME_UNIT_DAY;
tSma.interval = 1;
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
tSma.sliding = 0;
tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN);
tSma.tableUid = 1234567890;
tSma.numOfColIds = 2;
tSma.numOfFuncIds = 5; // sum/min/max/avg/last
tSma.colIds = (col_id_t *)calloc(tSma.numOfColIds, sizeof(col_id_t));
tSma.funcIds = (uint16_t *)calloc(tSma.numOfFuncIds, sizeof(uint16_t));
for (int32_t i = 0; i < tSma.numOfColIds; ++i) {
*(tSma.colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
for (int32_t i = 0; i < tSma.numOfFuncIds; ++i) {
*(tSma.funcIds + i) = (i + 2);
}
STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma};
uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper);
void *buf = calloc(bufLen, 1);
assert(buf != NULL);
STSmaWrapper *pSW = (STSmaWrapper *)buf;
uint32_t len = tEncodeTSmaWrapper(&buf, &tSmaWrapper);
EXPECT_EQ(len, bufLen);
// decode
STSmaWrapper dstTSmaWrapper = {0};
void * result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper);
assert(result != NULL);
EXPECT_EQ(tSmaWrapper.number, dstTSmaWrapper.number);
for (int i = 0; i < tSmaWrapper.number; ++i) {
STSma *pSma = tSmaWrapper.tSma + i;
STSma *qSma = dstTSmaWrapper.tSma + i;
EXPECT_EQ(pSma->version, qSma->version);
EXPECT_EQ(pSma->intervalUnit, qSma->intervalUnit);
EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit);
EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName);
EXPECT_EQ(pSma->numOfColIds, qSma->numOfColIds);
EXPECT_EQ(pSma->numOfFuncIds, qSma->numOfFuncIds);
EXPECT_EQ(pSma->tableUid, qSma->tableUid);
EXPECT_EQ(pSma->interval, qSma->interval);
EXPECT_EQ(pSma->sliding, qSma->sliding);
for (uint32_t j = 0; j < pSma->numOfColIds; ++j) {
EXPECT_EQ(*(col_id_t *)(pSma->colIds + j), *(col_id_t *)(qSma->colIds + j));
}
for (uint32_t j = 0; j < pSma->numOfFuncIds; ++j) {
EXPECT_EQ(*(uint16_t *)(pSma->funcIds + j), *(uint16_t *)(qSma->funcIds + j));
}
}
// resource release
tdDestroyTSma(&tSma, false);
tdDestroyWrapper(&dstTSmaWrapper);
}
#if 0
TEST(testCase, tSmaInsertTest) {
STSma tSma = {0};
STSmaData* pSmaData = NULL;
STsdb tsdb = {0};
// init
tSma.intervalUnit = TD_TIME_UNIT_DAY;
tSma.interval = 1;
tSma.numOfFuncIds = 5; // sum/min/max/avg/last
int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t);
int32_t numOfColIds = 3;
int32_t numOfSmaBlocks = 10;
int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize;
pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen);
ASSERT_EQ(pSmaData != NULL, true);
pSmaData->tableUid = 3232329230;
pSmaData->numOfColIds = numOfColIds;
pSmaData->numOfSmaBlocks = numOfSmaBlocks;
pSmaData->dataLen = dataLen;
pSmaData->tsWindow.skey = 1640000000;
pSmaData->tsWindow.ekey = 1645788649;
pSmaData->colIds = (col_id_t*)malloc(sizeof(col_id_t) * numOfColIds);
ASSERT_EQ(pSmaData->colIds != NULL, true);
for (int32_t i = 0; i < numOfColIds; ++i) {
*(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
// execute
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS);
// release
tdDestroySmaData(pSmaData);
}
#endif
#pragma GCC diagnostic pop
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册