Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f91cf03c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f91cf03c
编写于
12月 15, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
9673e0ed
2e643995
变更
30
显示空白变更内容
内联
并排
Showing
30 changed file
with
635 addition
and
716 deletion
+635
-716
include/common/taosmsg.h
include/common/taosmsg.h
+23
-2
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+1
-7
include/libs/planner/planner.h
include/libs/planner/planner.h
+16
-68
include/libs/query/query.h
include/libs/query/query.h
+16
-0
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+36
-1
include/libs/wal/wal.h
include/libs/wal/wal.h
+15
-27
include/os/osMemory.h
include/os/osMemory.h
+2
-2
include/util/tref.h
include/util/tref.h
+2
-0
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+103
-13
source/libs/index/inc/index_fst_automation.h
source/libs/index/inc/index_fst_automation.h
+5
-2
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+2
-0
source/libs/index/src/index_fst_automation.c
source/libs/index/src/index_fst_automation.c
+34
-19
source/libs/parser/CMakeLists.txt
source/libs/parser/CMakeLists.txt
+2
-2
source/libs/parser/test/CMakeLists.txt
source/libs/parser/test/CMakeLists.txt
+1
-1
source/libs/parser/test/plannerTest.cpp
source/libs/parser/test/plannerTest.cpp
+5
-4
source/libs/planner/CMakeLists.txt
source/libs/planner/CMakeLists.txt
+2
-2
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+99
-42
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+36
-0
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+9
-12
source/libs/planner/test/CMakeLists.txt
source/libs/planner/test/CMakeLists.txt
+1
-1
source/libs/query/src/querymsg.c
source/libs/query/src/querymsg.c
+145
-1
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+10
-4
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+9
-9
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+40
-55
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+4
-4
source/libs/wal/src/walSeek.c
source/libs/wal/src/walSeek.c
+1
-5
source/libs/wal/src/walUtil.c
source/libs/wal/src/walUtil.c
+0
-120
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+6
-304
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+9
-9
未找到文件。
include/common/taosmsg.h
浏览文件 @
f91cf03c
...
...
@@ -219,6 +219,13 @@ typedef struct SBuildTableMetaInput {
char
*
tableFullName
;
}
SBuildTableMetaInput
;
typedef
struct
SBuildUseDBInput
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
vgroupVersion
;
int32_t
dbGroupVersion
;
}
SBuildUseDBInput
;
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
...
...
@@ -620,9 +627,12 @@ typedef struct {
typedef
struct
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
int8_t
ignoreNotExists
;
int32_t
vgroupVersion
;
int32_t
dbGroupVersion
;
int32_t
reserve
[
8
];
}
SUseDbMsg
;
typedef
struct
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
reserve
[
8
];
...
...
@@ -809,8 +819,6 @@ typedef struct SVgroupListRspMsg {
SVgroupInfo
vgroupInfo
[];
}
SVgroupListRspMsg
;
typedef
SVgroupListRspMsg
SVgroupListInfo
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
...
...
@@ -855,6 +863,19 @@ typedef struct {
char
*
data
;
}
STagData
;
typedef
struct
{
int32_t
vgroupNum
;
int32_t
vgroupVersion
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
dbVgroupVersion
;
int32_t
dbVgroupNum
;
int32_t
dbHashRange
;
SVgroupInfo
vgroupInfo
[];
//int32_t vgIdList[];
}
SUseDbRspMsg
;
/*
* sql: show tables like '%a_%'
* payload is the query condition, e.g., '%a_%'
...
...
include/libs/catalog/catalog.h
浏览文件 @
f91cf03c
...
...
@@ -27,16 +27,10 @@ extern "C" {
#include "transport.h"
#include "common.h"
#include "taosmsg.h"
#include "query.h"
struct
SCatalog
;
typedef
struct
SDBVgroupInfo
{
int32_t
vgroupVersion
;
SArray
*
vgId
;
int32_t
hashRange
;
int32_t
hashNum
;
}
SDBVgroupInfo
;
typedef
struct
SCatalogReq
{
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
//????
SArray
*
pTableName
;
// table full name
...
...
include/libs/planner/planner.h
浏览文件 @
f91cf03c
...
...
@@ -22,6 +22,7 @@ extern "C" {
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
enum
OPERATOR_TYPE_E
{
OP_TableScan
=
1
,
...
...
@@ -54,90 +55,37 @@ enum OPERATOR_TYPE_E {
struct
SEpSet
;
struct
SQueryPlanNode
;
struct
S
QueryDistPlan
Node
;
struct
S
Phy
Node
;
struct
SQueryStmtInfo
;
typedef
struct
SSubquery
{
int64_t
queryId
;
// the subquery id created by qnode
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL
int32_t
level
;
// the execution level of current subquery, starting from 0.
SArray
*
pUpstream
;
// the upstream,from which to fetch the result
struct
SQueryDistPlanNode
*
pNode
;
// physical plan of current subquery
}
SSubquery
;
typedef
struct
SSubplan
{
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
SArray
*
pDatasource
;
// the datasource subplan,from which to fetch the result
struct
SPhyNode
*
pNode
;
// physical plan of current subplan
}
SSubplan
;
typedef
struct
SQueryJob
{
SArray
**
pSubqueries
;
int32_t
numOfLevels
;
int32_t
currentLevel
;
}
SQueryJob
;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
* @return
*/
int32_t
qOptimizeQueryPlan
(
struct
SQueryPlanNode
*
pQueryNode
);
/**
* Create the query plan according to the bound AST, which is in the form of pQueryInfo
* @param pQueryInfo
* @param pQueryNode
* @return
*/
int32_t
qCreateQueryPlan
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SQueryPlanNode
**
pQueryNode
);
/**
* Convert the query plan to string, in order to display it in the shell.
* @param pQueryNode
* @return
*/
int32_t
qQueryPlanToString
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
str
);
typedef
struct
SQueryDag
{
SArray
**
pSubplans
;
}
SQueryDag
;
/**
* Restore the SQL statement according to the logic query plan.
* @param pQueryNode
* @param sql
* @return
* Create the physical plan for the query, according to the AST.
*/
int32_t
q
QueryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
q
CreateQueryDag
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
);
/**
* Create the physical plan for the query, according to the logic plan.
* @param pQueryNode
* @param pPhyNode
* @return
*/
int32_t
qCreatePhysicalPlan
(
struct
SQueryPlanNode
*
pQueryNode
,
struct
SEpSet
*
pQnode
,
struct
SQueryDistPlanNode
*
pPhyNode
);
int32_t
qExplainQuery
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
/**
* Convert to physical plan to string to enable to print it out in the shell.
* @param pPhyNode
* @param str
* @return
* Convert to subplan to string for the scheduler to send to the executor
*/
int32_t
qPhyPlanToString
(
struct
SQueryDistPlanNode
*
pPhyNode
,
char
**
str
);
/**
* Destroy the query plan object.
* @return
*/
void
*
qDestroyQueryPlan
(
struct
SQueryPlanNode
*
pQueryNode
);
int32_t
qSubPlanToString
(
struct
SSubplan
*
pPhyNode
,
char
**
str
);
/**
* Destroy the physical plan.
* @param pQueryPhyNode
* @return
*/
void
*
qDestroyQueryPhyPlan
(
struct
SQueryDistPlanNode
*
pQueryPhyNode
);
/**
* Create the query job from the physical execution plan
* @param pPhyNode
* @param pJob
* @return
*/
int32_t
qCreateQueryJob
(
const
struct
SQueryDistPlanNode
*
pPhyNode
,
struct
SQueryJob
**
pJob
);
void
*
qDestroyQueryDag
(
struct
SQueryDag
*
pDag
);
#ifdef __cplusplus
}
...
...
include/libs/query/query.h
浏览文件 @
f91cf03c
...
...
@@ -20,6 +20,22 @@
extern
"C"
{
#endif
#include "tarray.h"
typedef
SVgroupListRspMsg
SVgroupListInfo
;
typedef
struct
SDBVgroupInfo
{
int32_t
vgroupVersion
;
SArray
*
vgId
;
int32_t
hashRange
;
}
SDBVgroupInfo
;
typedef
struct
SUseDbOutput
{
SVgroupListInfo
*
vgroupList
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
SDBVgroupInfo
*
dbVgroup
;
}
SUseDbOutput
;
extern
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
f91cf03c
...
...
@@ -20,7 +20,42 @@
extern
"C"
{
#endif
struct
SQueryJob
;
typedef
struct
SQueryProfileSummary
{
int64_t
startTs
;
// Object created and added into the message queue
int64_t
endTs
;
// the timestamp when the task is completed
int64_t
cputime
;
// total cpu cost, not execute elapsed time
int64_t
loadRemoteDataDuration
;
// remote io time
int64_t
loadNativeDataDuration
;
// native disk io time
uint64_t
loadNativeData
;
// blocks + SMA + header files
uint64_t
loadRemoteData
;
// remote data acquired by exchange operator.
uint64_t
waitDuration
;
// the time to waiting to be scheduled in queue does matter, so we need to record it
int64_t
addQTs
;
// the time to be added into the message queue, used to calculate the waiting duration in queue.
uint64_t
totalRows
;
uint64_t
loadRows
;
uint32_t
totalBlocks
;
uint32_t
loadBlocks
;
uint32_t
loadBlockAgg
;
uint32_t
skipBlocks
;
uint64_t
resultSize
;
// generated result size in Kb.
}
SQueryProfileSummary
;
typedef
struct
SQueryTask
{
uint64_t
queryId
;
// query id
uint64_t
taskId
;
// task id
char
*
pSubplan
;
// operator tree
uint64_t
status
;
// task status
SQueryProfileSummary
summary
;
// task execution summary
void
*
pOutputHandle
;
// result buffer handle, to temporarily keep the output result for next stage
}
SQueryTask
;
typedef
struct
SQueryJob
{
SArray
**
pSubtasks
;
// todo
}
SQueryJob
;
/**
* Process the query job, generated according to the query physical plan.
...
...
include/libs/wal/wal.h
浏览文件 @
f91cf03c
...
...
@@ -32,23 +32,19 @@ extern int32_t wDebugFlag;
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_HEAD_VER 0
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)
+ 16
)
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
#define WAL_CUR_FAILED 1
#pragma pack(push,1)
#pragma pack(push,
1)
typedef
enum
{
TAOS_WAL_NOLOG
=
0
,
TAOS_WAL_WRITE
=
1
,
...
...
@@ -56,11 +52,11 @@ typedef enum {
}
EWalType
;
typedef
struct
SWalReadHead
{
int8_t
sv
er
;
int8_t
headV
er
;
uint8_t
msgType
;
int8_t
reserved
[
2
];
int32_t
len
;
//
int64_t ingestTs; //not implemented
int64_t
ingestTs
;
//not implemented
int64_t
version
;
char
body
[];
}
SWalReadHead
;
...
...
@@ -76,14 +72,6 @@ typedef struct {
}
SWalCfg
;
typedef
struct
{
//union {
//uint32_t info;
//struct {
//uint32_t sver:3;
//uint32_t msgtype: 5;
//uint32_t reserved : 24;
//};
//};
uint32_t
cksumHead
;
uint32_t
cksumBody
;
SWalReadHead
head
;
...
...
@@ -102,16 +90,16 @@ typedef struct SWal {
SWalCfg
cfg
;
SWalVer
vers
;
//file set
int32_t
writeCur
;
int64_t
writeLogTfd
;
int64_t
writeIdxTfd
;
int32_t
writeCur
;
SArray
*
fileInfoSet
;
//statistics
int64_t
totSize
;
int64_t
lastRollSeq
;
//ctl
int32_t
curStatus
;
int32_t
fsyncSeq
;
int64_t
totSize
;
int64_t
refId
;
int64_t
lastRollSeq
;
pthread_mutex_t
mutex
;
//path
char
path
[
WAL_PATH_LEN
];
...
...
@@ -131,7 +119,7 @@ typedef struct SWalReadHandle {
}
SWalReadHandle
;
#pragma pack(pop)
typedef
int32_t
(
*
FWalWrite
)(
void
*
ahandle
,
void
*
pHead
);
//
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization
int32_t
walInit
();
...
...
@@ -151,8 +139,8 @@ int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t
walRollback
(
SWal
*
,
int64_t
ver
);
// notify that previous logs can be pruned safely
int32_t
walBegin
Take
Snapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walEnd
Take
Snapshot
(
SWal
*
);
int32_t
walBeginSnapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walEndSnapshot
(
SWal
*
);
//int32_t walDataCorrupted(SWal*);
// read
...
...
@@ -161,7 +149,7 @@ void walCloseReadHandle(SWalReadHandle *);
int32_t
walReadWithHandle
(
SWalReadHandle
*
pRead
,
int64_t
ver
);
int32_t
walRead
(
SWal
*
,
SWalHead
**
,
int64_t
ver
);
int32_t
walReadWithFp
(
SWal
*
,
FWalWrite
writeFp
,
int64_t
verStart
,
int32_t
readNum
);
//
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
// lifecycle check
int64_t
walGetFirstVer
(
SWal
*
);
...
...
include/os/osMemory.h
浏览文件 @
f91cf03c
...
...
@@ -23,8 +23,8 @@ extern "C" {
#define tfree(x) \
do { \
if (x) { \
free((void *)
x
); \
x
= 0; \
free((void *)
(x)
); \
(x)
= 0; \
} \
} while (0)
...
...
include/util/tref.h
浏览文件 @
f91cf03c
...
...
@@ -17,6 +17,8 @@
#ifndef _TD_UTIL_REF_H
#define _TD_UTIL_REF_H
#include "os.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
f91cf03c
...
...
@@ -26,6 +26,7 @@ extern "C" {
#define CTG_DEFAULT_CLUSTER_NUMBER 6
#define CTG_DEFAULT_VGROUP_NUMBER 100
#define CTG_DEFAULT_DB_NUMBER 20
#define CTG_DEFAULT_INVALID_VERSION (-1)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
f91cf03c
...
...
@@ -63,21 +63,69 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t*
}
int32_t
ctgGetDBVgroupFromCache
(
SCatalog
*
pCatalog
,
char
*
dbName
,
SDBVgroupInfo
**
dbInfo
,
int32_t
*
exist
)
{
/*
int32_t
ctgGetDBVgroupFromCache
(
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
**
dbInfo
,
int32_t
*
exist
)
{
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
*
exist
=
0
;
return
TSDB_CODE_SUCCESS
;
}
taosHashGet(SHashObj * pHashObj, const void * key, size_t keyLen)
SDBVgroupInfo
*
info
=
taosHashGet
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
if
(
NULL
==
info
||
info
->
vgroupVersion
<
pCatalog
->
vgroupCache
.
vgroupVersion
)
{
*
exist
=
0
;
return
TSDB_CODE_SUCCESS
;
}
if
(
dbInfo
)
{
*pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache);
*
dbInfo
=
calloc
(
1
,
sizeof
(
**
dbInfo
));
if
(
NULL
==
*
dbInfo
)
{
ctgError
(
"calloc size[%d] failed"
,
(
int32_t
)
sizeof
(
**
dbInfo
));
return
TSDB_CODE_CTG_MEM_ERROR
;
}
(
*
dbInfo
)
->
vgId
=
taosArrayDup
(
info
->
vgId
);
if
(
NULL
==
(
*
dbInfo
)
->
vgId
)
{
ctgError
(
"taos array duplicate failed"
);
tfree
(
*
dbInfo
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
(
*
dbInfo
)
->
vgroupVersion
=
info
->
vgroupVersion
;
(
*
dbInfo
)
->
hashRange
=
info
->
hashRange
;
}
*
exist
=
1
;
*/
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetDBVgroupFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SBuildUseDBInput
*
input
,
SUseDbOutput
*
out
)
{
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
queryBuildMsg
[
TSDB_MSG_TYPE_USE_DB
](
input
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
return
code
;
}
SRpcMsg
rpcMsg
=
{
.
msgType
=
TSDB_MSG_TYPE_USE_DB
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
code
=
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
](
out
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
return
code
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -144,7 +192,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) {
int32_t
catalogUpdateVgroup
(
struct
SCatalog
*
pCatalog
,
SVgroupListInfo
*
pVgroup
)
{
if
(
NULL
==
pVgroup
)
{
ctgError
(
"
vgroup get from mnode succeed, but no output
"
);
ctgError
(
"
no valid vgroup list info to update
"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
...
...
@@ -262,7 +310,33 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
}
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
dbInfo
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
if
(
dbInfo
->
vgroupVersion
<
0
)
{
if
(
pCatalog
->
dbCache
.
cache
)
{
taosHashRemove
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
}
ctgWarn
(
"remove db [%s] from cache"
,
dbName
);
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
pCatalog
->
dbCache
.
cache
=
taosHashInit
(
CTG_DEFAULT_DB_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
ctgError
(
"init hash[%d] for db cache failed"
,
CTG_DEFAULT_DB_NUMBER
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
}
if
(
taosHashPut
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
),
dbInfo
,
sizeof
(
*
dbInfo
))
!=
0
)
{
ctgError
(
"push to vgroup hash cache failed"
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -273,8 +347,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
/*
int32_t
exist
=
0
;
int32_t
code
=
0
;
if
(
0
==
forceUpdate
)
{
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbInfo
,
&
exist
));
...
...
@@ -284,18 +358,34 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
}
}
SDBVgroupInfo* newDbInfo = NULL;
SUseDbOutput
DbOut
=
{
0
};
SBuildUseDBInput
input
=
{
0
};
strncpy
(
input
.
db
,
dbName
,
sizeof
(
input
.
db
));
input
.
db
[
sizeof
(
input
.
db
)
-
1
]
=
0
;
input
.
vgroupVersion
=
pCatalog
->
vgroupCache
.
vgroupVersion
;
input
.
dbGroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, dbName, &newDbInfo));
CTG_ERR_RET
(
ctgGetDBVgroupFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
input
,
&
DbOut
));
if
(
DbOut
.
vgroupList
)
{
CTG_ERR_JRET
(
catalogUpdateVgroup
(
pCatalog
,
DbOut
.
vgroupList
));
}
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, newDbInfo));
if
(
DbOut
.
dbVgroup
)
{
CTG_ERR_JRET
(
catalogUpdateDBVgroup
(
pCatalog
,
dbName
,
DbOut
.
dbVgroup
));
}
if
(
dbInfo
)
{
*dbInfo = newDbInfo;
*
dbInfo
=
DbOut
.
dbVgroup
;
DbOut
.
dbVgroup
=
NULL
;
}
*/
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
DbOut
.
dbVgroup
);
tfree
(
DbOut
.
vgroupList
);
return
code
;
}
...
...
source/libs/index/inc/index_fst_automation.h
浏览文件 @
f91cf03c
...
...
@@ -40,7 +40,8 @@ typedef struct Complement {
// automation
typedef
struct
AutomationCtx
{
AutomationType
type
;
void
*
data
;
void
*
stdata
;
char
*
data
;
}
AutomationCtx
;
...
...
@@ -58,7 +59,9 @@ typedef struct StartWithStateValue {
}
;
}
StartWithStateValue
;
StartWithStateValue
*
startWithStateValueCreate
(
StartWithStateKind
kind
,
ValueType
ty
,
void
*
val
);
StartWithStateValue
*
startWithStateValueDump
(
StartWithStateValue
*
sv
);
void
startWithStateValueDestroy
(
void
*
sv
);
typedef
struct
AutomationFunc
{
...
...
@@ -70,7 +73,7 @@ typedef struct AutomationFunc {
void
*
(
*
acceptEof
)(
AutomationCtx
*
ct
,
void
*
state
);
}
AutomationFunc
;
AutomationCtx
*
automCtxCreate
(
void
*
data
,
AutomationType
type
);
AutomationCtx
*
automCtxCreate
(
void
*
data
,
AutomationType
a
type
);
void
automCtxDestroy
(
AutomationCtx
*
ctx
);
extern
AutomationFunc
automFuncs
[];
...
...
source/libs/index/src/index_fst.c
浏览文件 @
f91cf03c
...
...
@@ -17,6 +17,7 @@
#include "tcoding.h"
#include "tchecksum.h"
#include "indexInt.h"
#include "index_fst_automation.h"
static
void
fstPackDeltaIn
(
FstCountingWriter
*
wrt
,
CompiledAddr
nodeAddr
,
CompiledAddr
transAddr
,
uint8_t
nBytes
)
{
...
...
@@ -1402,6 +1403,7 @@ void swsResultDestroy(StreamWithStateResult *result) {
if
(
NULL
==
result
)
{
return
;
}
fstSliceDestroy
(
&
result
->
data
);
startWithStateValueDestroy
(
result
->
state
);
free
(
result
);
}
...
...
source/libs/index/src/index_fst_automation.c
浏览文件 @
f91cf03c
...
...
@@ -34,7 +34,8 @@ StartWithStateValue *startWithStateValueCreate(StartWithStateKind kind, ValueTyp
}
return
nsv
;
}
void
startWithStateValueDestroy
(
StartWithStateValue
*
sv
)
{
void
startWithStateValueDestroy
(
void
*
val
)
{
StartWithStateValue
*
sv
=
(
StartWithStateValue
*
)
val
;
if
(
sv
==
NULL
)
{
return
;
}
if
(
sv
->
type
==
FST_INT
)
{
...
...
@@ -68,19 +69,28 @@ StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) {
static
void
*
prefixStart
(
AutomationCtx
*
ctx
)
{
StartWithStateValue
*
data
=
(
StartWithStateValue
*
)(
ctx
->
data
);
return
data
;
return
startWithStateValueDump
(
data
);
};
static
bool
prefixIsMatch
(
AutomationCtx
*
ctx
,
void
*
data
)
{
return
true
;
static
bool
prefixIsMatch
(
AutomationCtx
*
ctx
,
void
*
sv
)
{
StartWithStateValue
*
ssv
=
(
StartWithStateValue
*
)
sv
;
return
ssv
->
val
==
strlen
(
ctx
->
data
);
}
static
bool
prefixCanMatch
(
AutomationCtx
*
ctx
,
void
*
data
)
{
return
true
;
static
bool
prefixCanMatch
(
AutomationCtx
*
ctx
,
void
*
sv
)
{
StartWithStateValue
*
ssv
=
(
StartWithStateValue
*
)
sv
;
return
ssv
->
val
>=
0
;
}
static
bool
prefixWillAlwaysMatch
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
true
;
}
static
void
*
prefixAccept
(
AutomationCtx
*
ctx
,
void
*
state
,
uint8_t
byte
)
{
StartWithStateValue
*
ssv
=
(
StartWithStateValue
*
)
state
;
if
(
ssv
==
NULL
||
ctx
==
NULL
)
{
return
NULL
;}
char
*
data
=
ctx
->
data
;
if
((
strlen
(
data
)
>
ssv
->
val
)
&&
data
[
ssv
->
val
]
==
byte
)
{
int
val
=
ssv
->
val
+
1
;
return
startWithStateValueCreate
(
Running
,
FST_INT
,
&
val
);
}
return
NULL
;
}
static
void
*
prefixAcceptEof
(
AutomationCtx
*
ctx
,
void
*
state
)
{
...
...
@@ -129,28 +139,33 @@ AutomationFunc automFuncs[] = {{
// add more search type
};
AutomationCtx
*
automCtxCreate
(
void
*
data
,
AutomationType
type
)
{
AutomationCtx
*
automCtxCreate
(
void
*
data
,
AutomationType
a
type
)
{
AutomationCtx
*
ctx
=
calloc
(
1
,
sizeof
(
AutomationCtx
));
if
(
ctx
==
NULL
)
{
return
NULL
;
}
if
(
type
==
AUTOMATION_PREFIX
)
{
StartWithStateValue
*
swsv
=
(
StartWithStateValue
*
)
calloc
(
1
,
sizeof
(
StartWithStateValue
));
swsv
->
kind
=
Done
;
//swsv->value = NULL;
ctx
->
data
=
(
void
*
)
swsv
;
}
else
if
(
type
==
AUTMMATION_MATCH
)
{
StartWithStateValue
*
sv
=
NULL
;
if
(
atype
==
AUTOMATION_PREFIX
)
{
sv
=
startWithStateValueCreate
(
Running
,
FST_INT
,
0
);
ctx
->
stdata
=
(
void
*
)
sv
;
}
else
if
(
atype
==
AUTMMATION_MATCH
)
{
}
else
{
// add more search type
}
ctx
->
type
=
type
;
char
*
src
=
(
char
*
)
data
;
size_t
len
=
strlen
(
src
);
char
*
dst
=
(
char
*
)
malloc
(
len
*
sizeof
(
char
)
+
1
);
memcpy
(
dst
,
src
,
len
);
dst
[
len
]
=
0
;
ctx
->
data
=
dst
;
ctx
->
type
=
atype
;
ctx
->
stdata
=
(
void
*
)
sv
;
return
ctx
;
}
void
automCtxDestroy
(
AutomationCtx
*
ctx
)
{
if
(
ctx
->
type
==
AUTOMATION_PREFIX
)
{
startWithStateValueDestroy
(
ctx
->
stdata
);
free
(
ctx
->
data
);
}
else
if
(
ctx
->
type
==
AUTMMATION_MATCH
)
{
}
free
(
ctx
);
}
source/libs/parser/CMakeLists.txt
浏览文件 @
f91cf03c
...
...
@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries
(
parser
PRIVATE os util common catalog function transport
PRIVATE os util common catalog function transport
query
)
ADD_SUBDIRECTORY
(
test
)
source/libs/parser/test/CMakeLists.txt
浏览文件 @
f91cf03c
...
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE
(
parserTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
parserTest
PUBLIC os util common parser catalog transport gtest function planner
PUBLIC os util common parser catalog transport gtest function planner
query
)
TARGET_INCLUDE_DIRECTORIES
(
...
...
source/libs/parser/test/plannerTest.cpp
浏览文件 @
f91cf03c
...
...
@@ -30,6 +30,7 @@
#include "tdef.h"
#include "tvariant.h"
#include "planner.h"
#include "../../planner/inc/plannerInt.h"
namespace
{
void
setSchema
(
SSchema
*
p
,
int32_t
type
,
int32_t
bytes
,
const
char
*
name
,
int32_t
colId
)
{
...
...
@@ -92,10 +93,10 @@ void generateLogicplan(const char* sql) {
ASSERT_EQ
(
ret
,
0
);
struct
SQueryPlanNode
*
n
=
nullptr
;
code
=
qC
reateQueryPlan
(
pQueryInfo
,
&
n
);
code
=
c
reateQueryPlan
(
pQueryInfo
,
&
n
);
char
*
str
=
NULL
;
q
Q
ueryPlanToString
(
n
,
&
str
);
queryPlanToString
(
n
,
&
str
);
printf
(
"--------SQL:%s
\n
"
,
sql
);
printf
(
"%s
\n
"
,
str
);
...
...
@@ -155,10 +156,10 @@ TEST(testCase, planner_test) {
ASSERT_EQ
(
pQueryInfo
->
fieldsInfo
.
numOfOutput
,
2
);
struct
SQueryPlanNode
*
n
=
nullptr
;
code
=
qC
reateQueryPlan
(
pQueryInfo
,
&
n
);
code
=
c
reateQueryPlan
(
pQueryInfo
,
&
n
);
char
*
str
=
NULL
;
q
Q
ueryPlanToString
(
n
,
&
str
);
queryPlanToString
(
n
,
&
str
);
printf
(
"%s
\n
"
,
str
);
destroyQueryInfo
(
pQueryInfo
);
...
...
source/libs/planner/CMakeLists.txt
浏览文件 @
f91cf03c
...
...
@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries
(
planner
PRIVATE os util common catalog parser transport function
PRIVATE os util common catalog parser transport function
query
)
ADD_SUBDIRECTORY
(
test
)
source/libs/planner/inc/plannerInt.h
浏览文件 @
f91cf03c
...
...
@@ -25,6 +25,19 @@ extern "C" {
#include "planner.h"
#include "taosmsg.h"
enum
LOGIC_PLAN_E
{
LP_SCAN
=
1
,
LP_SESSION
=
2
,
LP_STATE
=
3
,
LP_INTERVAL
=
4
,
LP_FILL
=
5
,
LP_AGG
=
6
,
LP_JOIN
=
7
,
LP_PROJECT
=
8
,
LP_DISTINCT
=
9
,
LP_ORDER
=
10
};
typedef
struct
SQueryNodeBasicInfo
{
int32_t
type
;
// operator type
char
*
name
;
// operator name
...
...
@@ -57,50 +70,94 @@ typedef struct SQueryPlanNode {
struct
SQueryPlanNode
*
nextNode
;
}
SQueryPlanNode
;
typedef
struct
SQueryDistPlanNode
{
typedef
SSchema
SSlotSchema
;
typedef
struct
SDataBlockSchema
{
int32_t
index
;
SSlotSchema
*
pSchema
;
int32_t
numOfCols
;
// number of columns
}
SDataBlockSchema
;
typedef
struct
SPhyNode
{
SQueryNodeBasicInfo
info
;
SSchema
*
pSchema
;
// the schema of the input SSDatablock
int32_t
numOfCols
;
// number of input columns
SArray
*
pExpr
;
// the query functions or sql aggregations
int32_t
numOfExpr
;
// number of result columns, which is also the number of pExprs
void
*
pExtInfo
;
// additional information
SArray
*
pTargets
;
// target list to be computed or scanned at this node
SArray
*
pConditions
;
// implicitly-ANDed qual conditions
SDataBlockSchema
targetSchema
;
// children plan to generated result for current node to process
// in case of join, multiple plan nodes exist.
SArray
*
pChildren
;
}
SPhyNode
;
typedef
struct
SScanPhyNode
{
SPhyNode
node
;
uint64_t
uid
;
// unique id of the table
}
SScanPhyNode
;
typedef
SScanPhyNode
STagScanPhyNode
;
typedef
SScanPhyNode
SSystemTableScanPhyNode
;
typedef
struct
SMultiTableScanPhyNode
{
SScanPhyNode
scan
;
SArray
*
pTagsConditions
;
// implicitly-ANDed tag qual conditions
}
SMultiTableScanPhyNode
;
typedef
SMultiTableScanPhyNode
SMultiTableSeqScanPhyNode
;
typedef
struct
SProjectPhyNode
{
SPhyNode
node
;
}
SProjectPhyNode
;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
* @return
*/
int32_t
optimizeQueryPlan
(
struct
SQueryPlanNode
*
pQueryNode
);
// previous operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray
*
pPrevNodes
;
// upstream nodes, or exchange operator to load data from multiple sources.
}
SQueryDistPlanNode
;
typedef
struct
SQueryCostSummary
{
int64_t
startTs
;
// Object created and added into the message queue
int64_t
endTs
;
// the timestamp when the task is completed
int64_t
cputime
;
// total cpu cost, not execute elapsed time
int64_t
loadRemoteDataDuration
;
// remote io time
int64_t
loadNativeDataDuration
;
// native disk io time
uint64_t
loadNativeData
;
// blocks + SMA + header files
uint64_t
loadRemoteData
;
// remote data acquired by exchange operator.
uint64_t
waitDuration
;
// the time to waiting to be scheduled in queue does matter, so we need to record it
int64_t
addQTs
;
// the time to be added into the message queue, used to calculate the waiting duration in queue.
uint64_t
totalRows
;
uint64_t
loadRows
;
uint32_t
totalBlocks
;
uint32_t
loadBlocks
;
uint32_t
loadBlockAgg
;
uint32_t
skipBlocks
;
uint64_t
resultSize
;
// generated result size in Kb.
}
SQueryCostSummary
;
typedef
struct
SQueryTask
{
uint64_t
queryId
;
// query id
uint64_t
taskId
;
// task id
SQueryDistPlanNode
*
pNode
;
// operator tree
uint64_t
status
;
// task status
SQueryCostSummary
summary
;
// task execution summary
void
*
pOutputHandle
;
// result buffer handle, to temporarily keep the output result for next stage
}
SQueryTask
;
/**
* Create the query plan according to the bound AST, which is in the form of pQueryInfo
* @param pQueryInfo
* @param pQueryNode
* @return
*/
int32_t
createQueryPlan
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SQueryPlanNode
**
pQueryNode
);
/**
* Convert the query plan to string, in order to display it in the shell.
* @param pQueryNode
* @return
*/
int32_t
queryPlanToString
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
str
);
/**
* Restore the SQL statement according to the logic query plan.
* @param pQueryNode
* @param sql
* @return
*/
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
/**
* Convert to physical plan to string to enable to print it out in the shell.
* @param pPhyNode
* @param str
* @return
*/
int32_t
phyPlanToString
(
struct
SPhyNode
*
pPhyNode
,
char
**
str
);
/**
* Destroy the query plan object.
* @return
*/
void
*
destroyQueryPlan
(
struct
SQueryPlanNode
*
pQueryNode
);
/**
* Destroy the physical plan.
* @param pQueryPhyNode
* @return
*/
void
*
destroyQueryPhyPlan
(
struct
SPhyNode
*
pQueryPhyNode
);
#ifdef __cplusplus
}
...
...
source/libs/planner/src/physicalPlan.c
0 → 100644
浏览文件 @
f91cf03c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "plannerInt.h"
SPhyNode
*
createScanNode
(
SQueryPlanNode
*
pPlanNode
)
{
return
NULL
;
}
SPhyNode
*
createPhyNode
(
SQueryPlanNode
*
node
)
{
switch
(
node
->
info
.
type
)
{
case
LP_SCAN
:
return
createScanNode
(
node
);
}
return
NULL
;
}
SPhyNode
*
createSubplan
(
SQueryPlanNode
*
pSubquery
)
{
return
NULL
;
}
int32_t
createDag
(
struct
SQueryPlanNode
*
pQueryNode
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
)
{
return
0
;
}
source/libs/planner/src/planner.c
浏览文件 @
f91cf03c
...
...
@@ -48,11 +48,11 @@ static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo);
static
void
doDestroyQueryNode
(
SQueryPlanNode
*
pQueryNode
);
int32_t
printExprInfo
(
const
char
*
buf
,
const
SQueryPlanNode
*
pQueryNode
,
int32_t
len
);
int32_t
qO
ptimizeQueryPlan
(
struct
SQueryPlanNode
*
pQueryNode
)
{
int32_t
o
ptimizeQueryPlan
(
struct
SQueryPlanNode
*
pQueryNode
)
{
return
0
;
}
int32_t
qC
reateQueryPlan
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SQueryPlanNode
**
pQueryNode
)
{
int32_t
c
reateQueryPlan
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SQueryPlanNode
**
pQueryNode
)
{
SArray
*
upstream
=
createQueryPlanImpl
((
struct
SQueryStmtInfo
*
)
pQueryInfo
);
assert
(
taosArrayGetSize
(
upstream
)
==
1
);
...
...
@@ -62,19 +62,20 @@ int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryP
return
TSDB_CODE_SUCCESS
;
}
int32_t
q
Q
ueryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
)
{
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
)
{
return
0
;
}
int32_t
qCreatePhysicalPlan
(
struct
SQueryPlanNode
*
pQueryNode
,
struct
SEpSet
*
pQnode
,
struct
SQueryDistPlanNode
*
pPhyNode
)
{
int32_t
qCreatePhysicalPlan
(
struct
SQueryPlanNode
*
pQueryNode
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
)
{
return
0
;
}
int32_t
qPhyPlanToString
(
struct
SQueryDistPlan
Node
*
pPhyNode
,
char
**
str
)
{
int32_t
phyPlanToString
(
struct
SPhy
Node
*
pPhyNode
,
char
**
str
)
{
return
0
;
}
void
*
qD
estroyQueryPlan
(
SQueryPlanNode
*
pQueryNode
)
{
void
*
d
estroyQueryPlan
(
SQueryPlanNode
*
pQueryNode
)
{
if
(
pQueryNode
==
NULL
)
{
return
NULL
;
}
...
...
@@ -83,14 +84,10 @@ void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) {
return
NULL
;
}
void
*
qDestroyQueryPhyPlan
(
struct
SQueryDistPlan
Node
*
pQueryPhyNode
)
{
void
*
destroyQueryPhyPlan
(
struct
SPhy
Node
*
pQueryPhyNode
)
{
return
NULL
;
}
int32_t
qCreateQueryJob
(
const
struct
SQueryDistPlanNode
*
pPhyNode
,
struct
SQueryJob
**
pJob
)
{
return
0
;
}
//======================================================================================================================
static
SQueryPlanNode
*
createQueryNode
(
int32_t
type
,
const
char
*
name
,
SQueryPlanNode
**
prev
,
int32_t
numOfPrev
,
...
...
@@ -619,7 +616,7 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev
return
len
;
}
int32_t
q
Q
ueryPlanToString
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
str
)
{
int32_t
queryPlanToString
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
str
)
{
assert
(
pQueryNode
);
*
str
=
calloc
(
1
,
4096
);
...
...
source/libs/planner/test/CMakeLists.txt
浏览文件 @
f91cf03c
...
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE
(
plannerTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
plannerTest
PUBLIC os util common planner parser catalog transport gtest function
PUBLIC os util common planner parser catalog transport gtest function
query
)
TARGET_INCLUDE_DIRECTORIES
(
...
...
source/libs/query/src/querymsg.c
浏览文件 @
f91cf03c
...
...
@@ -15,7 +15,7 @@
#include "taosmsg.h"
#include "queryInt.h"
#include "query.h"
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
=
{
0
};
...
...
@@ -60,6 +60,36 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildUseDbMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
input
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SBuildUseDBInput
*
bInput
=
(
SBuildUseDBInput
*
)
input
;
int32_t
estimateSize
=
sizeof
(
SUseDbMsg
);
if
(
NULL
==
*
msg
||
msgSize
<
estimateSize
)
{
tfree
(
*
msg
);
*
msg
=
calloc
(
1
,
estimateSize
);
if
(
NULL
==
*
msg
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
SUseDbMsg
*
bMsg
=
(
SUseDbMsg
*
)
*
msg
;
strncpy
(
bMsg
->
db
,
bInput
->
db
,
sizeof
(
bMsg
->
db
));
bMsg
->
db
[
sizeof
(
bMsg
->
db
)
-
1
]
=
0
;
bMsg
->
vgroupVersion
=
bInput
->
vgroupVersion
;
bMsg
->
dbGroupVersion
=
bInput
->
dbGroupVersion
;
*
msgLen
=
(
int32_t
)
sizeof
(
*
bMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessVgroupListRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
...
...
@@ -103,12 +133,126 @@ int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SUseDbRspMsg
*
pRsp
=
(
SUseDbRspMsg
*
)
msg
;
SUseDbOutput
*
pOut
=
(
SUseDbOutput
*
)
output
;
int32_t
code
=
0
;
if
(
msgSize
<=
sizeof
(
*
pRsp
))
{
qError
(
"invalid use db rsp msg size, msgSize:%d"
,
msgSize
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
pRsp
->
vgroupVersion
=
htonl
(
pRsp
->
vgroupVersion
);
pRsp
->
dbVgroupVersion
=
htonl
(
pRsp
->
dbVgroupVersion
);
pRsp
->
vgroupNum
=
htonl
(
pRsp
->
vgroupNum
);
pRsp
->
dbVgroupNum
=
htonl
(
pRsp
->
dbVgroupNum
);
if
(
pRsp
->
vgroupNum
<
0
)
{
qError
(
"invalid vgroup number[%d]"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
if
(
pRsp
->
dbVgroupNum
<
0
)
{
qError
(
"invalid db vgroup number[%d]"
,
pRsp
->
dbVgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
int32_t
expectSize
=
pRsp
->
vgroupNum
*
sizeof
(
pRsp
->
vgroupInfo
[
0
])
+
pRsp
->
dbVgroupNum
*
sizeof
(
int32_t
)
+
sizeof
(
*
pRsp
);
if
(
msgSize
!=
expectSize
)
{
qError
(
"vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d"
,
msgSize
,
expectSize
,
pRsp
->
vgroupNum
,
pRsp
->
dbVgroupNum
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
if
(
pRsp
->
vgroupVersion
<
0
)
{
qInfo
(
"no new vgroup list info"
);
if
(
pRsp
->
vgroupNum
!=
0
)
{
qError
(
"invalid vgroup number[%d] for no new vgroup list case"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
}
else
{
int32_t
s
=
sizeof
(
*
pOut
->
vgroupList
)
+
sizeof
(
pOut
->
vgroupList
->
vgroupInfo
[
0
])
*
pRsp
->
vgroupNum
;
pOut
->
vgroupList
=
calloc
(
1
,
s
);
if
(
NULL
==
pOut
->
vgroupList
)
{
qError
(
"calloc size[%d] failed"
,
s
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pOut
->
vgroupList
->
vgroupNum
=
pRsp
->
vgroupNum
;
pOut
->
vgroupList
->
vgroupVersion
=
pRsp
->
vgroupVersion
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgroupNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
memcpy
(
&
pOut
->
vgroupList
->
vgroupInfo
[
i
],
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
]));
}
}
int32_t
*
vgIdList
=
(
int32_t
*
)((
char
*
)
pRsp
->
vgroupInfo
+
sizeof
(
pRsp
->
vgroupInfo
[
0
])
*
pRsp
->
vgroupNum
);
memcpy
(
pOut
->
db
,
pRsp
->
db
,
sizeof
(
pOut
->
db
));
if
(
pRsp
->
dbVgroupVersion
<
0
)
{
qInfo
(
"no new vgroup info for db[%s]"
,
pRsp
->
db
);
}
else
{
pOut
->
dbVgroup
=
calloc
(
1
,
sizeof
(
*
pOut
->
dbVgroup
));
if
(
NULL
==
pOut
->
dbVgroup
)
{
qError
(
"calloc size[%d] failed"
,
(
int32_t
)
sizeof
(
*
pOut
->
dbVgroup
));
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_exit
;
}
pOut
->
dbVgroup
->
vgId
=
taosArrayInit
(
pRsp
->
dbVgroupNum
,
sizeof
(
int32_t
));
if
(
NULL
==
pOut
->
dbVgroup
->
vgId
)
{
qError
(
"taosArrayInit size[%d] failed"
,
pRsp
->
dbVgroupNum
);
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_exit
;
}
pOut
->
dbVgroup
->
vgroupVersion
=
pRsp
->
dbVgroupVersion
;
pOut
->
dbVgroup
->
hashRange
=
htonl
(
pRsp
->
dbHashRange
);
for
(
int32_t
i
=
0
;
i
<
pRsp
->
dbVgroupNum
;
++
i
)
{
*
(
vgIdList
+
i
)
=
htonl
(
*
(
vgIdList
+
i
));
taosArrayPush
(
pOut
->
dbVgroup
->
vgId
,
vgIdList
+
i
)
;
}
}
return
code
;
_exit:
if
(
pOut
->
dbVgroup
&&
pOut
->
dbVgroup
->
vgId
)
{
taosArrayDestroy
(
pOut
->
dbVgroup
->
vgId
);
pOut
->
dbVgroup
->
vgId
=
NULL
;
}
tfree
(
pOut
->
dbVgroup
);
tfree
(
pOut
->
vgroupList
);
return
code
;
}
void
msgInit
()
{
queryBuildMsg
[
TSDB_MSG_TYPE_TABLE_META
]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryBuildVgroupListReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_USE_DB
]
=
queryBuildUseDbMsg
;
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryProcessVgroupListRsp
;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
]
=
queryProcessUseDBRsp
;
/*
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
...
...
source/libs/wal/inc/walInt.h
浏览文件 @
f91cf03c
...
...
@@ -33,12 +33,10 @@ typedef struct WalFileInfo {
int64_t
fileSize
;
}
WalFileInfo
;
#pragma pack(push,1)
typedef
struct
WalIdxEntry
{
int64_t
ver
;
int64_t
offset
;
}
WalIdxEntry
;
#pragma pack(pop)
static
inline
int32_t
compareWalFileInfo
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
WalFileInfo
*
pInfoLeft
=
(
WalFileInfo
*
)
pLeft
;
...
...
@@ -107,8 +105,16 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
return
taosCalcChecksum
(
0
,
(
uint8_t
*
)
body
,
len
);
}
int
walReadMeta
(
SWal
*
pWal
);
int
walWriteMeta
(
SWal
*
pWal
);
static
inline
void
walResetVer
(
SWalVer
*
pVer
)
{
pVer
->
firstVer
=
-
1
;
pVer
->
verInSnapshotting
=
-
1
;
pVer
->
snapshotVer
=
-
1
;
pVer
->
commitVer
=
-
1
;
pVer
->
lastVer
=
-
1
;
}
int
walLoadMeta
(
SWal
*
pWal
);
int
walSaveMeta
(
SWal
*
pWal
);
int
walRollFileInfo
(
SWal
*
pWal
);
char
*
walMetaSerialize
(
SWal
*
pWal
);
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
f91cf03c
...
...
@@ -24,18 +24,22 @@
#include <libgen.h>
#include <regex.h>
int64_t
walGetFirstVer
(
SWal
*
pWal
)
{
int64_t
inline
walGetFirstVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
firstVer
;
}
int64_t
walGetSnaphostVer
(
SWal
*
pWal
)
{
int64_t
inline
walGetSnaphostVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
snapshotVer
;
}
int64_t
walGetLastVer
(
SWal
*
pWal
)
{
int64_t
inline
walGetLastVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
lastVer
;
}
static
inline
int
walBuildMetaName
(
SWal
*
pWal
,
int
metaVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
}
int
walRollFileInfo
(
SWal
*
pWal
)
{
int64_t
ts
=
taosGetTimestampSec
();
...
...
@@ -150,10 +154,6 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
return
0
;
}
static
inline
int
walBuildMetaName
(
SWal
*
pWal
,
int
metaVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
}
static
int
walFindCurMetaVer
(
SWal
*
pWal
)
{
const
char
*
pattern
=
"^meta-ver[0-9]+$"
;
regex_t
walMetaRegexPattern
;
...
...
@@ -182,7 +182,7 @@ static int walFindCurMetaVer(SWal* pWal) {
return
metaVer
;
}
int
wal
Writ
eMeta
(
SWal
*
pWal
)
{
int
wal
Sav
eMeta
(
SWal
*
pWal
)
{
int
metaVer
=
walFindCurMetaVer
(
pWal
);
char
fnameStr
[
WAL_FILE_LEN
];
walBuildMetaName
(
pWal
,
metaVer
+
1
,
fnameStr
);
...
...
@@ -207,7 +207,7 @@ int walWriteMeta(SWal* pWal) {
return
0
;
}
int
wal
Re
adMeta
(
SWal
*
pWal
)
{
int
wal
Lo
adMeta
(
SWal
*
pWal
)
{
ASSERT
(
pWal
->
fileInfoSet
->
size
==
0
);
//find existing meta file
int
metaVer
=
walFindCurMetaVer
(
pWal
);
...
...
source/libs/wal/src/walMgmt.c
浏览文件 @
f91cf03c
...
...
@@ -21,23 +21,17 @@
#include "compare.h"
#include "walInt.h"
//internal
int32_t
walGetNextFile
(
SWal
*
pWal
,
int64_t
*
nextFileId
);
int32_t
walGetOldFile
(
SWal
*
pWal
,
int64_t
curFileId
,
int32_t
minDiff
,
int64_t
*
oldFileId
);
int32_t
walGetNewFile
(
SWal
*
pWal
,
int64_t
*
newFileId
);
typedef
struct
{
int32_t
refSetId
;
uint32_t
seq
;
int8_t
stop
;
int8_t
inited
;
uint32_t
seq
;
int32_t
refSetId
;
pthread_t
thread
;
}
SWalMgmt
;
static
SWalMgmt
tsWal
=
{
0
,
.
seq
=
1
};
static
int32_t
walCreateThread
();
static
void
walStopThread
();
static
int32_t
walInitObj
(
SWal
*
pWal
);
static
void
walFreeObj
(
void
*
pWal
);
int64_t
walGetSeq
()
{
...
...
@@ -68,7 +62,7 @@ int32_t walInit() {
}
void
walCleanUp
()
{
int
old
=
atomic_val_compare_exchange_8
(
&
tsWal
.
inited
,
1
,
0
);
int
8_t
old
=
atomic_val_compare_exchange_8
(
&
tsWal
.
inited
,
1
,
0
);
if
(
old
==
0
)
{
return
;
}
...
...
@@ -83,48 +77,59 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
memset
(
pWal
,
0
,
sizeof
(
SWal
));
pWal
->
writeLogTfd
=
-
1
;
pWal
->
writeIdxTfd
=
-
1
;
pWal
->
writeCur
=
-
1
;
//set config
memcpy
(
&
pWal
->
cfg
,
pCfg
,
sizeof
(
SWalCfg
));
pWal
->
fsyncSeq
=
pCfg
->
fsyncPeriod
/
1000
;
if
(
pWal
->
fsyncSeq
<=
0
)
pWal
->
fsyncSeq
=
1
;
//init version info
pWal
->
vers
.
firstVer
=
-
1
;
pWal
->
vers
.
commitVer
=
-
1
;
pWal
->
vers
.
snapshotVer
=
-
1
;
pWal
->
vers
.
lastVer
=
-
1
;
pWal
->
vers
.
verInSnapshotting
=
-
1
;
tstrncpy
(
pWal
->
path
,
path
,
sizeof
(
pWal
->
path
));
if
(
taosMkDir
(
pWal
->
path
)
!=
0
)
{
wError
(
"vgId:%d, path:%s, failed to create directory since %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
NULL
;
}
pWal
->
totSize
=
0
;
//open meta
pWal
->
writeLogTfd
=
-
1
;
pWal
->
writeIdxTfd
=
-
1
;
pWal
->
writeCur
=
-
1
;
pWal
->
fileInfoSet
=
taosArrayInit
(
8
,
sizeof
(
WalFileInfo
));
if
(
pWal
->
fileInfoSet
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to init taosArray %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
free
(
pWal
);
return
NULL
;
}
//init status
walResetVer
(
&
pWal
->
vers
);
pWal
->
totSize
=
0
;
pWal
->
lastRollSeq
=
-
1
;
//init write buffer
memset
(
&
pWal
->
writeHead
,
0
,
sizeof
(
SWalHead
));
pWal
->
writeHead
.
head
.
sver
=
0
;
tstrncpy
(
pWal
->
path
,
path
,
sizeof
(
pWal
->
path
));
pthread_mutex_init
(
&
pWal
->
mutex
,
NULL
);
pWal
->
fsyncSeq
=
pCfg
->
fsyncPeriod
/
1000
;
if
(
pWal
->
fsyncSeq
<=
0
)
pWal
->
fsyncSeq
=
1
;
pWal
->
writeHead
.
head
.
headVer
=
WAL_HEAD_VER
;
if
(
walInitObj
(
pWal
)
!=
0
)
{
walFreeObj
(
pWal
);
if
(
pthread_mutex_init
(
&
pWal
->
mutex
,
NULL
)
<
0
)
{
taosArrayDestroy
(
pWal
->
fileInfoSet
);
free
(
pWal
);
return
NULL
;
}
pWal
->
refId
=
taosAddRef
(
tsWal
.
refSetId
,
pWal
);
if
(
pWal
->
refId
<
0
)
{
walFreeObj
(
pWal
);
if
(
pWal
->
refId
<
0
)
{
pthread_mutex_destroy
(
&
pWal
->
mutex
);
taosArrayDestroy
(
pWal
->
fileInfoSet
);
free
(
pWal
);
return
NULL
;
}
if
(
walLoadMeta
(
pWal
)
<
0
)
{
taosRemoveRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
pthread_mutex_destroy
(
&
pWal
->
mutex
);
taosArrayDestroy
(
pWal
->
fileInfoSet
);
free
(
pWal
);
return
NULL
;
}
walReadMeta
(
pWal
);
wDebug
(
"vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d"
,
pWal
->
cfg
.
vgId
,
pWal
,
pWal
->
cfg
.
level
,
pWal
->
cfg
.
fsyncPeriod
);
...
...
@@ -152,43 +157,23 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
}
void
walClose
(
SWal
*
pWal
)
{
if
(
pWal
==
NULL
)
return
;
pthread_mutex_lock
(
&
pWal
->
mutex
);
tfClose
(
pWal
->
writeLogTfd
);
pWal
->
writeLogTfd
=
-
1
;
tfClose
(
pWal
->
writeIdxTfd
);
pWal
->
writeIdxTfd
=
-
1
;
wal
Writ
eMeta
(
pWal
);
wal
Sav
eMeta
(
pWal
);
taosArrayDestroy
(
pWal
->
fileInfoSet
);
pWal
->
fileInfoSet
=
NULL
;
pthread_mutex_unlock
(
&
pWal
->
mutex
);
taosRemoveRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
}
static
int32_t
walInitObj
(
SWal
*
pWal
)
{
if
(
taosMkDir
(
pWal
->
path
)
!=
0
)
{
wError
(
"vgId:%d, path:%s, failed to create directory since %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
TAOS_SYSTEM_ERROR
(
errno
);
}
pWal
->
fileInfoSet
=
taosArrayInit
(
8
,
sizeof
(
WalFileInfo
));
if
(
pWal
->
fileInfoSet
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to init taosArray %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
TAOS_SYSTEM_ERROR
(
errno
);
}
wDebug
(
"vgId:%d, object is initialized"
,
pWal
->
cfg
.
vgId
);
return
0
;
taosRemoveRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
}
static
void
walFreeObj
(
void
*
wal
)
{
SWal
*
pWal
=
wal
;
wDebug
(
"vgId:%d, wal:%p is freed"
,
pWal
->
cfg
.
vgId
,
pWal
);
tfClose
(
pWal
->
writeLogTfd
);
tfClose
(
pWal
->
writeIdxTfd
);
taosArrayDestroy
(
pWal
->
fileInfoSet
);
pWal
->
fileInfoSet
=
NULL
;
pthread_mutex_destroy
(
&
pWal
->
mutex
);
tfree
(
pWal
);
}
...
...
source/libs/wal/src/walRead.c
浏览文件 @
f91cf03c
...
...
@@ -54,7 +54,7 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
int64_t
logTfd
=
pRead
->
readLogTfd
;
//seek position
int64_t
offset
=
(
ver
-
fileFirstVer
)
*
WAL_IDX_ENTRY_SIZE
;
int64_t
offset
=
(
ver
-
fileFirstVer
)
*
sizeof
(
WalIdxEntry
)
;
code
=
tfLseek
(
idxTfd
,
offset
,
SEEK_SET
);
if
(
code
<
0
)
{
return
-
1
;
...
...
@@ -210,6 +210,6 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
return
0
;
}
int32_t
walReadWithFp
(
SWal
*
pWal
,
FWalWrite
writeFp
,
int64_t
verStart
,
int32_t
readNum
)
{
return
0
;
}
/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/
/*return 0;*/
/*}*/
source/libs/wal/src/walSeek.c
浏览文件 @
f91cf03c
...
...
@@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t
logTfd
=
pWal
->
writeLogTfd
;
//seek position
int64_t
offset
=
(
ver
-
walGetCurFileFirstVer
(
pWal
))
*
WAL_IDX_ENTRY_SIZE
;
int64_t
offset
=
(
ver
-
walGetCurFileFirstVer
(
pWal
))
*
sizeof
(
WalIdxEntry
)
;
code
=
tfLseek
(
idxTfd
,
offset
,
SEEK_SET
);
if
(
code
!=
0
)
{
return
-
1
;
...
...
@@ -66,8 +66,6 @@ int walChangeFileToLast(SWal *pWal) {
//switch file
pWal
->
writeIdxTfd
=
idxTfd
;
pWal
->
writeLogTfd
=
logTfd
;
//change status
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
;
return
0
;
}
...
...
@@ -93,13 +91,11 @@ int walChangeFile(SWal *pWal, int64_t ver) {
int64_t
fileFirstVer
=
pRet
->
firstVer
;
//closed
if
(
taosArrayGetLast
(
pWal
->
fileInfoSet
)
!=
pRet
)
{
pWal
->
curStatus
&=
~
WAL_CUR_FILE_WRITABLE
;
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenRead
(
fnameStr
);
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenRead
(
fnameStr
);
}
else
{
pWal
->
curStatus
|=
WAL_CUR_FILE_WRITABLE
;
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
...
...
source/libs/wal/src/walUtil.c
已删除
100644 → 0
浏览文件 @
9673e0ed
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "walInt.h"
#if 0
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
int64_t curFileId = *nextFileId;
int64_t minFileId = INT64_MAX;
DIR *dir = opendir(pWal->path);
if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent *ent;
while ((ent = readdir(dir)) != NULL) {
char *name = ent->d_name;
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
int64_t id = atoll(name + WAL_PREFIX_LEN);
if (id <= curFileId) continue;
if (id < minFileId) {
minFileId = id;
}
}
}
closedir(dir);
if (minFileId == INT64_MAX) return -1;
*nextFileId = minFileId;
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId);
return 0;
}
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) {
int64_t minFileId = INT64_MAX;
DIR *dir = opendir(pWal->path);
if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent *ent;
while ((ent = readdir(dir)) != NULL) {
char *name = ent->d_name;
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
int64_t id = atoll(name + WAL_PREFIX_LEN);
if (id >= curFileId) continue;
minDiff--;
if (id < minFileId) {
minFileId = id;
}
}
}
closedir(dir);
if (minFileId == INT64_MAX) return -1;
if (minDiff > 0) return -1;
*oldFileId = minFileId;
wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId);
return 0;
}
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) {
int64_t maxFileId = INT64_MIN;
DIR *dir = opendir(pWal->path);
if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent *ent;
while ((ent = readdir(dir)) != NULL) {
char *name = ent->d_name;
if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
int64_t id = atoll(name + WAL_PREFIX_LEN);
if (id > maxFileId) {
maxFileId = id;
}
}
}
closedir(dir);
if (maxFileId == INT64_MIN) {
*newFileId = 0;
} else {
*newFileId = maxFileId;
}
wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId);
return 0;
}
#endif
source/libs/wal/src/walWrite.c
浏览文件 @
f91cf03c
...
...
@@ -21,98 +21,6 @@
#include "tfile.h"
#include "walInt.h"
#if 0
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
int32_t walRenew(void *handle) {
if (handle == NULL) return 0;
SWal * pWal = handle;
int32_t code = 0;
/*if (pWal->stop) {*/
/*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/
/*return 0;*/
/*}*/
pthread_mutex_lock(&pWal->mutex);
if (tfValid(pWal->logTfd)) {
tfClose(pWal->logTfd);
wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName);
}
/*if (pWal->keep == TAOS_WAL_KEEP) {*/
/*pWal->fileId = 0;*/
/*} else {*/
/*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/
/*pWal->fileId++;*/
/*}*/
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
pWal->logTfd = tfOpenCreateWrite(pWal->logName);
if (!tfValid(pWal->logTfd)) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
} else {
wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName);
}
pthread_mutex_unlock(&pWal->mutex);
return code;
}
void walRemoveOneOldFile(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
/*if (pWal->keep == TAOS_WAL_KEEP) return;*/
if (!tfValid(pWal->logTfd)) return;
pthread_mutex_lock(&pWal->mutex);
// remove the oldest wal file
int64_t oldFileId = -1;
if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) {
char walName[WAL_FILE_LEN] = {0};
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
if (remove(walName) < 0) {
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
} else {
wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
}
}
pthread_mutex_unlock(&pWal->mutex);
}
void walRemoveAllOldFiles(void *handle) {
if (handle == NULL) return;
SWal * pWal = handle;
int64_t fileId = -1;
pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->logTfd);
wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName);
while (walGetNextFile(pWal, &fileId) >= 0) {
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
if (remove(pWal->logName) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno));
} else {
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName);
}
}
pthread_mutex_unlock(&pWal->mutex);
}
#endif
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
ASSERT
(
pWal
->
vers
.
commitVer
>=
pWal
->
vers
.
snapshotVer
);
ASSERT
(
pWal
->
vers
.
commitVer
<=
pWal
->
vers
.
lastVer
);
...
...
@@ -166,7 +74,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
-
1
;
}
int
idxOff
=
(
ver
-
walGetCurFileFirstVer
(
pWal
))
*
WAL_IDX_ENTRY_SIZE
;
int
idxOff
=
(
ver
-
walGetCurFileFirstVer
(
pWal
))
*
sizeof
(
WalIdxEntry
)
;
code
=
tfLseek
(
idxTfd
,
idxOff
,
SEEK_SET
);
if
(
code
<
0
)
{
pthread_mutex_unlock
(
&
pWal
->
mutex
);
...
...
@@ -229,7 +137,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return
0
;
}
int32_t
walBegin
Take
Snapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
int32_t
walBeginSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
pWal
->
vers
.
verInSnapshotting
=
ver
;
//check file rolling
if
(
pWal
->
cfg
.
retentionPeriod
==
0
)
{
...
...
@@ -239,7 +147,7 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
return
0
;
}
int32_t
walEnd
Take
Snapshot
(
SWal
*
pWal
)
{
int32_t
walEndSnapshot
(
SWal
*
pWal
)
{
int64_t
ver
=
pWal
->
vers
.
verInSnapshotting
;
if
(
ver
==
-
1
)
return
-
1
;
...
...
@@ -287,7 +195,7 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
pWal
->
vers
.
verInSnapshotting
=
-
1
;
//save snapshot ver, commit ver
int
code
=
wal
Writ
eMeta
(
pWal
);
int
code
=
wal
Sav
eMeta
(
pWal
);
if
(
code
!=
0
)
{
return
-
1
;
}
...
...
@@ -314,13 +222,13 @@ int walRoll(SWal *pWal) {
int64_t
newFileFirstVersion
=
pWal
->
vers
.
lastVer
+
1
;
char
fnameStr
[
WAL_FILE_LEN
];
walBuildIdxName
(
pWal
,
newFileFirstVersion
,
fnameStr
);
idxTfd
=
tfOpenCreateWrite
(
fnameStr
);
idxTfd
=
tfOpenCreateWrite
Append
(
fnameStr
);
if
(
idxTfd
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
walBuildLogName
(
pWal
,
newFileFirstVersion
,
fnameStr
);
logTfd
=
tfOpenCreateWrite
(
fnameStr
);
logTfd
=
tfOpenCreateWrite
Append
(
fnameStr
);
if
(
logTfd
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
@@ -335,8 +243,6 @@ int walRoll(SWal *pWal) {
pWal
->
writeIdxTfd
=
idxTfd
;
pWal
->
writeLogTfd
=
logTfd
;
pWal
->
writeCur
=
taosArrayGetSize
(
pWal
->
fileInfoSet
)
-
1
;
//change status
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
&
WAL_CUR_POS_WRITABLE
;
pWal
->
lastRollSeq
=
walGetSeq
();
return
0
;
...
...
@@ -425,74 +331,6 @@ void walFsync(SWal *pWal, bool forceFsync) {
}
}
#if 0
int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
if (handle == NULL) return -1;
SWal * pWal = handle;
int32_t count = 0;
int32_t code = 0;
int64_t fileId = -1;
while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
/*if (fileId == pWal->curFileId) continue;*/
char walName[WAL_FILE_LEN];
snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
if (code != TSDB_CODE_SUCCESS) {
wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
continue;
}
wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion);
count++;
}
/*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/
if (count == 0) {
wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
return walRenew(pWal);
} else {
// open the existing WAL file in append mode
/*pWal->curFileId = 0;*/
snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName);
if (!tfValid(pWal->logTfd)) {
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName);
}
return TSDB_CODE_SUCCESS;
}
int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
if (handle == NULL) return -1;
SWal *pWal = handle;
if (*fileId == 0) *fileId = -1;
pthread_mutex_lock(&(pWal->mutex));
int32_t code = walGetNextFile(pWal, fileId);
if (code >= 0) {
sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
/*code = (*fileId == pWal->curFileId) ? 0 : 1;*/
}
wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId);
pthread_mutex_unlock(&(pWal->mutex));
return code;
}
#endif
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
/*int code = 0;*/
/*SWalHead *pHead = NULL;*/
...
...
@@ -516,139 +354,3 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
/*return 0;*/
/*}*/
#if 0
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset;
while (1) {
pos++;
if (tfLseek(tfd, pos, SEEK_SET) < 0) {
wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (pHead->signature != WAL_SIGNATURE) {
continue;
}
if (pHead->sver >= 1) {
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
}
}
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
int32_t size = WAL_MAX_SIZE;
void * buffer = malloc(size);
if (buffer == NULL) {
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
int64_t tfd = tfOpenReadWrite(name);
if (!tfValid(tfd)) {
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
tfree(buffer);
return TAOS_SYSTEM_ERROR(errno);
} else {
wDebug("vgId:%d, file:%s, open for restore", pWal->vgId, name);
}
int32_t code = TSDB_CODE_SUCCESS;
int64_t offset = 0;
SWalHead *pHead = buffer;
while (1) {
int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead));
if (ret == 0) break;
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal head since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < sizeof(SWalHead)) {
wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
walFtruncate(pWal, tfd, offset);
break;
}
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
offset += sizeof(SWalHead);
continue;
}
if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) {
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
offset = offset + sizeof(SWalHead) + pHead->len;
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset);
pWal->curVersion = pHead->version;
// wInfo("writeFp: %ld", offset);
(*writeFp)(pVnode, pHead);
}
tfClose(tfd);
tfree(buffer);
wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name);
return code;
}
#endif
source/libs/wal/test/walMetaTest.cpp
浏览文件 @
f91cf03c
...
...
@@ -142,7 +142,7 @@ TEST_F(WalCleanEnv, serialize) {
char
*
ss
=
walMetaSerialize
(
pWal
);
printf
(
"%s
\n
"
,
ss
);
free
(
ss
);
code
=
wal
Writ
eMeta
(
pWal
);
code
=
wal
Sav
eMeta
(
pWal
);
ASSERT
(
code
==
0
);
}
...
...
@@ -150,11 +150,11 @@ TEST_F(WalCleanEnv, removeOldMeta) {
int
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
code
=
wal
Writ
eMeta
(
pWal
);
code
=
wal
Sav
eMeta
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
wal
Writ
eMeta
(
pWal
);
code
=
wal
Sav
eMeta
(
pWal
);
ASSERT
(
code
==
0
);
}
...
...
@@ -199,7 +199,7 @@ TEST_F(WalCleanEnv, write) {
ASSERT_EQ
(
code
,
-
1
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
i
);
}
code
=
wal
Writ
eMeta
(
pWal
);
code
=
wal
Sav
eMeta
(
pWal
);
ASSERT_EQ
(
code
,
0
);
}
...
...
@@ -216,7 +216,7 @@ TEST_F(WalCleanEnv, rollback) {
code
=
walRollback
(
pWal
,
3
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
2
);
code
=
wal
Writ
eMeta
(
pWal
);
code
=
wal
Sav
eMeta
(
pWal
);
ASSERT_EQ
(
code
,
0
);
}
...
...
@@ -231,9 +231,9 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ
(
pWal
->
vers
.
commitVer
,
i
);
}
walBegin
Take
Snapshot
(
pWal
,
i
-
1
);
walBeginSnapshot
(
pWal
,
i
-
1
);
ASSERT_EQ
(
pWal
->
vers
.
verInSnapshotting
,
i
-
1
);
walEnd
Take
Snapshot
(
pWal
);
walEndSnapshot
(
pWal
);
ASSERT_EQ
(
pWal
->
vers
.
snapshotVer
,
i
-
1
);
ASSERT_EQ
(
pWal
->
vers
.
verInSnapshotting
,
-
1
);
...
...
@@ -247,9 +247,9 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ
(
pWal
->
vers
.
commitVer
,
i
);
}
code
=
walBegin
Take
Snapshot
(
pWal
,
i
-
1
);
code
=
walBeginSnapshot
(
pWal
,
i
-
1
);
ASSERT_EQ
(
code
,
0
);
code
=
walEnd
Take
Snapshot
(
pWal
);
code
=
walEndSnapshot
(
pWal
);
ASSERT_EQ
(
code
,
0
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录