Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
abcdeb84
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
abcdeb84
编写于
5月 05, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode
上级
d2f889a3
e882eea6
变更
27
展开全部
隐藏空白更改
内联
并排
Showing
27 changed file
with
1406 addition
and
2143 deletion
+1406
-2143
cmake/cmake.define
cmake/cmake.define
+7
-1
include/libs/index/index.h
include/libs/index/index.h
+11
-1
include/util/tskiplist2.h
include/util/tskiplist2.h
+70
-0
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+17
-1
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+0
-2
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+9
-9
source/dnode/vnode/src/meta/metaTDBImpl.c
source/dnode/vnode/src/meta/metaTDBImpl.c
+8
-8
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+166
-45
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+35
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+24
-11
source/libs/executor/src/indexoperator.c
source/libs/executor/src/indexoperator.c
+8
-6
source/libs/index/inc/indexFst.h
source/libs/index/inc/indexFst.h
+3
-4
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+1
-1
source/libs/index/src/index.c
source/libs/index/src/index.c
+0
-1
source/libs/index/src/indexCache.c
source/libs/index/src/indexCache.c
+66
-1
source/libs/index/src/indexTfile.c
source/libs/index/src/indexTfile.c
+49
-1
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+21
-1
source/libs/tdb/inc/tdb.h
source/libs/tdb/inc/tdb.h
+5
-5
source/libs/tdb/src/db/tdbBtree.c
source/libs/tdb/src/db/tdbBtree.c
+202
-36
source/libs/tdb/src/db/tdbDb.c
source/libs/tdb/src/db/tdbDb.c
+12
-18
source/libs/tdb/src/db/tdbPage.c
source/libs/tdb/src/db/tdbPage.c
+5
-0
source/libs/tdb/src/inc/tdbInt.h
source/libs/tdb/src/inc/tdbInt.h
+5
-0
source/libs/tdb/test/tdbTest.cpp
source/libs/tdb/test/tdbTest.cpp
+172
-8
source/util/src/tskiplist2.c
source/util/src/tskiplist2.c
+175
-0
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+333
-1977
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
cmake/cmake.define
浏览文件 @
abcdeb84
...
@@ -67,7 +67,13 @@ ELSE ()
...
@@ -67,7 +67,13 @@ ELSE ()
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_")
ELSE ()
ELSE ()
ADD_DEFINITIONS("-msse4.2 -mfma")
ADD_DEFINITIONS("-msse4.2")
IF("${FMA_SUPPORT}" MATCHES "true")
MESSAGE(STATUS "turn fma function support on")
ADD_DEFINITIONS("-mfma")
ELSE ()
MESSAGE(STATUS "turn fma function support off")
ENDIF()
ENDIF ()
ENDIF ()
ENDIF ()
ENDIF ()
include/libs/index/index.h
浏览文件 @
abcdeb84
...
@@ -47,7 +47,17 @@ typedef enum {
...
@@ -47,7 +47,17 @@ typedef enum {
}
SIndexOperOnColumn
;
}
SIndexOperOnColumn
;
typedef
enum
{
MUST
=
0
,
SHOULD
,
NOT
}
EIndexOperatorType
;
typedef
enum
{
MUST
=
0
,
SHOULD
,
NOT
}
EIndexOperatorType
;
typedef
enum
{
QUERY_TERM
=
0
,
QUERY_PREFIX
,
QUERY_SUFFIX
,
QUERY_REGEX
,
QUERY_RANGE
}
EIndexQueryType
;
typedef
enum
{
QUERY_TERM
=
0
,
QUERY_PREFIX
,
QUERY_SUFFIX
,
QUERY_REGEX
,
QUERY_LESS_THAN
,
QUERY_LESS_EQUAL
,
QUERY_GREATER_THAN
,
QUERY_GREATER_EQUAL
,
QUERY_RANGE
}
EIndexQueryType
;
/*
/*
* create multi query
* create multi query
...
...
include/util/tskiplist2.h
0 → 100644
浏览文件 @
abcdeb84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_SKIPLIST2_H_
#define _TD_UTIL_SKIPLIST2_H_
#include "os.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define SL_MAX_LEVEL 15
typedef
struct
SSkipList2
SSkipList2
;
typedef
struct
SSLCursor
SSLCursor
;
typedef
struct
SSLCfg
SSLCfg
;
typedef
struct
SSLNode
SSLNode
;
typedef
int32_t
(
*
tslCmprFn
)(
const
void
*
pKey1
,
int32_t
nKey1
,
const
void
*
pKey2
,
int32_t
nKey2
);
// SSkipList2
int32_t
slOpen
(
const
SSLCfg
*
pCfg
,
SSkipList2
**
ppSl
);
int32_t
slClose
(
SSkipList2
*
pSl
);
int32_t
slClear
(
SSkipList2
*
pSl
);
// SSLCursor
int32_t
slcOpen
(
SSkipList2
*
pSl
,
SSLCursor
*
pSlc
);
int32_t
slcClose
(
SSLCursor
*
pSlc
);
int32_t
slcMoveTo
(
SSLCursor
*
pSlc
,
const
void
*
pKey
,
int32_t
nKey
);
int32_t
slcMoveToNext
(
SSLCursor
*
pSlc
);
int32_t
slcMoveToPrev
(
SSLCursor
*
pSlc
);
int32_t
slcMoveToFirst
(
SSLCursor
*
pSlc
);
int32_t
slcMoveToLast
(
SSLCursor
*
pSlc
);
int32_t
slcPut
(
SSLCursor
*
pSlc
,
const
void
*
pKey
,
int32_t
nKey
,
const
void
*
pData
,
int32_t
nData
);
int32_t
slcGet
(
SSLCursor
*
pSlc
,
const
void
**
ppKey
,
int32_t
*
nKey
,
const
void
**
ppData
,
int32_t
*
nData
);
int32_t
slcDrop
(
SSLCursor
*
pSlc
);
// struct
struct
SSLCfg
{
int8_t
maxLevel
;
int32_t
nKey
;
int32_t
nData
;
tslCmprFn
cmprFn
;
void
*
pPool
;
void
*
(
*
xMalloc
)(
void
*
,
int32_t
size
);
void
(
*
xFree
)(
void
*
,
void
*
);
};
struct
SSLCursor
{
SSkipList2
*
pSl
;
SSLNode
**
forwards
[
SL_MAX_LEVEL
];
};
#ifdef __cplusplus
}
#endif
#endif
/*_TD_UTIL_SKIPLIST2_H_*/
\ No newline at end of file
source/client/src/clientStmt.c
浏览文件 @
abcdeb84
...
@@ -649,6 +649,19 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
...
@@ -649,6 +649,19 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_FETCH_FIELDS
));
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_FETCH_FIELDS
));
if
(
pStmt
->
bInfo
.
needParse
&&
pStmt
->
sql
.
runTimes
&&
pStmt
->
sql
.
type
>
0
&&
STMT_TYPE_MULTI_INSERT
!=
pStmt
->
sql
.
type
)
{
pStmt
->
bInfo
.
needParse
=
false
;
}
if
(
pStmt
->
exec
.
pRequest
&&
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
&&
pStmt
->
sql
.
runTimes
)
{
taos_free_result
(
pStmt
->
exec
.
pRequest
);
pStmt
->
exec
.
pRequest
=
NULL
;
}
if
(
NULL
==
pStmt
->
exec
.
pRequest
)
{
STMT_ERR_RET
(
buildRequest
(
pStmt
->
taos
,
pStmt
->
sql
.
sqlStr
,
pStmt
->
sql
.
sqlLen
,
&
pStmt
->
exec
.
pRequest
));
}
if
(
pStmt
->
bInfo
.
needParse
)
{
if
(
pStmt
->
bInfo
.
needParse
)
{
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
}
}
...
@@ -658,8 +671,11 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
...
@@ -658,8 +671,11 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
STMT_ERR_RET
(
getQueryPlan
(
pStmt
->
exec
.
pRequest
,
pStmt
->
sql
.
pQuery
,
&
pStmt
->
sql
.
nodeList
));
STMT_ERR_RET
(
getQueryPlan
(
pStmt
->
exec
.
pRequest
,
pStmt
->
sql
.
pQuery
,
&
pStmt
->
sql
.
nodeList
));
pStmt
->
sql
.
pQueryPlan
=
pStmt
->
exec
.
pRequest
->
body
.
pDag
;
pStmt
->
sql
.
pQueryPlan
=
pStmt
->
exec
.
pRequest
->
body
.
pDag
;
pStmt
->
exec
.
pRequest
->
body
.
pDag
=
NULL
;
pStmt
->
exec
.
pRequest
->
body
.
pDag
=
NULL
;
STMT_ERR_RET
(
stmtBackupQueryFields
(
pStmt
));
}
else
{
STMT_ERR_RET
(
stmtRestoreQueryFields
(
pStmt
));
}
}
*
nums
=
taosArrayGetSize
(
pStmt
->
sql
.
pQueryPlan
->
pPlaceholderValues
);
*
nums
=
taosArrayGetSize
(
pStmt
->
sql
.
pQueryPlan
->
pPlaceholderValues
);
}
else
{
}
else
{
STMT_ERR_RET
(
stmtFetchColFields
(
stmt
,
nums
,
NULL
));
STMT_ERR_RET
(
stmtFetchColFields
(
stmt
,
nums
,
NULL
));
...
...
source/dnode/vnode/src/inc/meta.h
浏览文件 @
abcdeb84
...
@@ -103,8 +103,6 @@ typedef struct {
...
@@ -103,8 +103,6 @@ typedef struct {
#if 1
#if 1
// int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCursor
(
SMSmaCursor
*
pSmaCur
);
void
metaCloseSmaCursor
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
abcdeb84
...
@@ -46,13 +46,13 @@
...
@@ -46,13 +46,13 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
typedef
struct
SVnodeInfo
SVnodeInfo
;
typedef
struct
SVnodeInfo
SVnodeInfo
;
typedef
struct
SMeta
SMeta
;
typedef
struct
SMeta
SMeta
;
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdb
STsdb
;
typedef
struct
STQ
STQ
;
typedef
struct
STQ
STQ
;
typedef
struct
SVState
SVState
;
typedef
struct
SVState
SVState
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SQWorker
SQHandle
;
typedef
struct
SQWorker
SQHandle
;
#define VNODE_META_DIR "meta"
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
#define VNODE_TSDB_DIR "tsdb"
...
@@ -77,6 +77,7 @@ int metaCommit(SMeta* pMeta);
...
@@ -77,6 +77,7 @@ int metaCommit(SMeta* pMeta);
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
);
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
);
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
);
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
);
int
metaCreateTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateTbReq
*
pReq
);
int
metaCreateTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateTbReq
*
pReq
);
int
metaDropTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVDropTbReq
*
pReq
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
int
metaGetTableEntryByName
(
SMetaReader
*
pReader
,
const
char
*
name
);
int
metaGetTableEntryByName
(
SMetaReader
*
pReader
,
const
char
*
name
);
...
@@ -100,7 +101,7 @@ int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
...
@@ -100,7 +101,7 @@ int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
tsdbReaderT
*
tsdbQueryTables
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
*
tsdbQueryTables
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
uint64_t
taskId
);
uint64_t
taskId
);
tsdbReaderT
tsdbQueryCacheLastT
(
STsdb
*
tsdb
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
tsdbQueryCacheLastT
(
STsdb
*
tsdb
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
void
*
pMemRef
);
int32_t
tsdbGetTableGroupFromIdListT
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetTableGroupFromIdListT
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
...
@@ -189,7 +190,6 @@ struct STbUidStore {
...
@@ -189,7 +190,6 @@ struct STbUidStore {
#define TD_VID(PVNODE) (PVNODE)->config.vgId
#define TD_VID(PVNODE) (PVNODE)->config.vgId
static
FORCE_INLINE
bool
vnodeIsRollup
(
SVnode
*
pVnode
)
{
static
FORCE_INLINE
bool
vnodeIsRollup
(
SVnode
*
pVnode
)
{
SRetention
*
pRetention
=
&
(
pVnode
->
config
.
tsdbCfg
.
retentions
[
0
]);
SRetention
*
pRetention
=
&
(
pVnode
->
config
.
tsdbCfg
.
retentions
[
0
]);
return
(
pRetention
->
freq
>
0
&&
pRetention
->
keep
>
0
);
return
(
pRetention
->
freq
>
0
&&
pRetention
->
keep
>
0
);
...
...
source/dnode/vnode/src/meta/metaTDBImpl.c
浏览文件 @
abcdeb84
...
@@ -289,7 +289,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
...
@@ -289,7 +289,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
pVal
=
pBuf
=
buf
;
pVal
=
pBuf
=
buf
;
metaEncodeTbInfo
(
&
pBuf
,
pTbCfg
);
metaEncodeTbInfo
(
&
pBuf
,
pTbCfg
);
vLen
=
POINTER_DISTANCE
(
pBuf
,
buf
);
vLen
=
POINTER_DISTANCE
(
pBuf
,
buf
);
ret
=
tdbDb
Pu
t
(
pMetaDb
->
pTbDB
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
ret
=
tdbDb
Inser
t
(
pMetaDb
->
pTbDB
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -311,7 +311,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
...
@@ -311,7 +311,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
pVal
=
pBuf
=
buf
;
pVal
=
pBuf
=
buf
;
metaEncodeSchemaEx
(
&
pBuf
,
&
schemaWrapper
);
metaEncodeSchemaEx
(
&
pBuf
,
&
schemaWrapper
);
vLen
=
POINTER_DISTANCE
(
pBuf
,
buf
);
vLen
=
POINTER_DISTANCE
(
pBuf
,
buf
);
ret
=
tdbDb
Pu
t
(
pMetaDb
->
pSchemaDB
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMeta
->
pDB
->
txn
);
ret
=
tdbDb
Inser
t
(
pMetaDb
->
pSchemaDB
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMeta
->
pDB
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -325,7 +325,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
...
@@ -325,7 +325,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen
=
nameLen
+
1
+
sizeof
(
uid
);
kLen
=
nameLen
+
1
+
sizeof
(
uid
);
pVal
=
NULL
;
pVal
=
NULL
;
vLen
=
0
;
vLen
=
0
;
ret
=
tdbDb
Pu
t
(
pMetaDb
->
pNameIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
ret
=
tdbDb
Inser
t
(
pMetaDb
->
pNameIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -336,7 +336,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
...
@@ -336,7 +336,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen
=
sizeof
(
uid
);
kLen
=
sizeof
(
uid
);
pVal
=
NULL
;
pVal
=
NULL
;
vLen
=
0
;
vLen
=
0
;
ret
=
tdbDb
Pu
t
(
pMetaDb
->
pStbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
ret
=
tdbDb
Inser
t
(
pMetaDb
->
pStbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -347,7 +347,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
...
@@ -347,7 +347,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen
=
sizeof
(
ctbIdxKey
);
kLen
=
sizeof
(
ctbIdxKey
);
pVal
=
NULL
;
pVal
=
NULL
;
vLen
=
0
;
vLen
=
0
;
ret
=
tdbDb
Pu
t
(
pMetaDb
->
pCtbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
ret
=
tdbDb
Inser
t
(
pMetaDb
->
pCtbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -362,7 +362,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
...
@@ -362,7 +362,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen
=
sizeof
(
uid
);
kLen
=
sizeof
(
uid
);
pVal
=
NULL
;
pVal
=
NULL
;
vLen
=
0
;
vLen
=
0
;
ret
=
tdbDb
Pu
t
(
pMetaDb
->
pNtbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
ret
=
tdbDb
Inser
t
(
pMetaDb
->
pNtbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -530,7 +530,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
...
@@ -530,7 +530,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
int32_t
kLen
=
sizeof
(
pSmaCfg
->
indexUid
);
int32_t
kLen
=
sizeof
(
pSmaCfg
->
indexUid
);
int32_t
vLen
=
POINTER_DISTANCE
(
qBuf
,
pBuf
);
int32_t
vLen
=
POINTER_DISTANCE
(
qBuf
,
pBuf
);
ret
=
tdbDb
Pu
t
(
pMeta
->
pDB
->
pSmaDB
,
key
,
kLen
,
val
,
vLen
,
&
pMetaDb
->
txn
);
ret
=
tdbDb
Inser
t
(
pMeta
->
pDB
->
pSmaDB
,
key
,
kLen
,
val
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
taosMemoryFreeClear
(
pBuf
);
taosMemoryFreeClear
(
pBuf
);
return
-
1
;
return
-
1
;
...
@@ -545,7 +545,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
...
@@ -545,7 +545,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
val
=
NULL
;
val
=
NULL
;
vLen
=
0
;
vLen
=
0
;
ret
=
tdbDb
Pu
t
(
pMeta
->
pDB
->
pSmaIdx
,
key
,
kLen
,
val
,
vLen
,
&
pMetaDb
->
txn
);
ret
=
tdbDb
Inser
t
(
pMeta
->
pDB
->
pSmaIdx
,
key
,
kLen
,
val
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
taosMemoryFreeClear
(
pBuf
);
taosMemoryFreeClear
(
pBuf
);
return
-
1
;
return
-
1
;
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
abcdeb84
...
@@ -72,44 +72,61 @@ _err:
...
@@ -72,44 +72,61 @@ _err:
}
}
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
)
{
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
)
{
SMetaReader
mr
=
{
0
};
TDBC
*
pNameIdxc
=
NULL
;
TDBC
*
pUidIdxc
=
NULL
;
// validate req
TDBC
*
pCtbIdxc
=
NULL
;
metaReaderInit
(
&
mr
,
pMeta
,
0
);
SCtbIdxKey
*
pCtbIdxKey
;
if
(
metaGetTableEntryByUid
(
&
mr
,
pReq
->
suid
)
<
0
)
{
const
void
*
pKey
=
NULL
;
terrno
=
TSDB_CODE_VND_TABLE_NOT_EXIST
;
int
nKey
;
const
void
*
pData
=
NULL
;
int
nData
;
int
c
,
ret
;
// prepare uid idx cursor
tdbDbcOpen
(
pMeta
->
pUidIdx
,
&
pUidIdxc
,
&
pMeta
->
txn
);
ret
=
tdbDbcMoveTo
(
pUidIdxc
,
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
c
);
if
(
ret
<
0
||
c
!=
0
)
{
terrno
=
TSDB_CODE_VND_TB_NOT_EXIST
;
tdbDbcClose
(
pUidIdxc
);
goto
_err
;
goto
_err
;
}
}
// do drop
// prepare name idx cursor
// drop from pTbDb
tdbDbcOpen
(
pMeta
->
pNameIdx
,
&
pNameIdxc
,
&
pMeta
->
txn
);
// drop from pSkmDb
ret
=
tdbDbcMoveTo
(
pNameIdxc
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
c
);
// drop from pUidIdx
if
(
ret
<
0
||
c
!=
0
)
{
// drop from pNameIdx
ASSERT
(
0
);
// {
}
// TDBC *pDbc1 = NULL;
// void *pKey = NULL;
tdbDbcDelete
(
pUidIdxc
);
// void *pVal = NULL;
tdbDbcDelete
(
pNameIdxc
);
// int kLen = 0;
tdbDbcClose
(
pUidIdxc
);
// int vLen = 0;
tdbDbcClose
(
pNameIdxc
);
// int ret = 0;
// loop to drop each child table
// // drop from pCtbIdx
tdbDbcOpen
(
pMeta
->
pCtbIdx
,
&
pCtbIdxc
,
&
pMeta
->
txn
);
// ret = tdbDbcOpen(pMeta->pCtbIdx, &pDbc1);
ret
=
tdbDbcMoveTo
(
pCtbIdxc
,
&
(
SCtbIdxKey
){.
suid
=
pReq
->
suid
,
.
uid
=
INT64_MIN
},
sizeof
(
SCtbIdxKey
),
&
c
);
// tdbDbcMoveTo(pDbc1, &pReq->suid, sizeof(pReq->suid), NULL /*cmpr*/, 0 /*TDB_FORWARD_SEARCH*/);
if
(
ret
<
0
||
(
c
<
0
&&
tdbDbcMoveToNext
(
pCtbIdxc
)
<
0
))
{
// tdbDbcGet(pDbc1, &pKey, &kLen, &pVal, vLen);
tdbDbcClose
(
pCtbIdxc
);
// tdbDbcDrop(pDbc1);
goto
_exit
;
// // drop from pTagIdx
}
// // drop from pTtlIdx
// }
for
(;;)
{
tdbDbcGet
(
pCtbIdxc
,
&
pKey
,
&
nKey
,
NULL
,
NULL
);
// clear and return
pCtbIdxKey
=
(
SCtbIdxKey
*
)
pKey
;
metaReaderClear
(
&
mr
);
metaError
(
"vgId:%d super table %s uid:%"
PRId64
" is dropped"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
);
if
(
pCtbIdxKey
->
suid
>
pReq
->
suid
)
break
;
// drop the child table (TODO)
if
(
tdbDbcMoveToNext
(
pCtbIdxc
)
<
0
)
break
;
}
_exit:
metaDebug
(
"vgId:%d super table %s uid:%"
PRId64
" is dropped"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
);
return
0
;
return
0
;
_err:
_err:
metaReaderClear
(
&
mr
);
metaError
(
"vgId:%d failed to drop super table %s uid:%"
PRId64
" since %s"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
metaError
(
"vgId:%d failed to drop super table %s uid:%"
PRId64
" since %s"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
,
tstrerror
(
terrno
));
pReq
->
suid
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
...
@@ -166,18 +183,122 @@ _err:
...
@@ -166,18 +183,122 @@ _err:
return
-
1
;
return
-
1
;
}
}
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
int
metaDropTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVDropTbReq
*
pReq
)
{
#if 0
TDBC
*
pTbDbc
=
NULL
;
if (metaRemoveTableFromIdx(pMeta, uid) < 0) {
TDBC
*
pUidIdxc
=
NULL
;
// TODO: handle error
TDBC
*
pNameIdxc
=
NULL
;
const
void
*
pData
;
int
nData
;
tb_uid_t
uid
;
int64_t
tver
;
SMetaEntry
me
=
{
0
};
SCoder
coder
=
{
0
};
int8_t
type
;
int64_t
ctime
;
tb_uid_t
suid
;
int
c
,
ret
;
// search & delete the name idx
tdbDbcOpen
(
pMeta
->
pNameIdx
,
&
pNameIdxc
,
&
pMeta
->
txn
);
ret
=
tdbDbcMoveTo
(
pNameIdxc
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
c
);
if
(
ret
<
0
||
c
)
{
tdbDbcClose
(
pNameIdxc
);
terrno
=
TSDB_CODE_VND_TABLE_NOT_EXIST
;
return
-
1
;
return
-
1
;
}
}
if (metaRemoveTableFromIdx(pMeta, uid) < 0) {
ret
=
tdbDbcGet
(
pNameIdxc
,
NULL
,
NULL
,
&
pData
,
&
nData
);
// TODO
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
return
-
1
;
}
}
#endif
uid
=
*
(
tb_uid_t
*
)
pData
;
tdbDbcDelete
(
pNameIdxc
);
tdbDbcClose
(
pNameIdxc
);
// search & delete uid idx
tdbDbcOpen
(
pMeta
->
pUidIdx
,
&
pUidIdxc
,
&
pMeta
->
txn
);
ret
=
tdbDbcMoveTo
(
pUidIdxc
,
&
uid
,
sizeof
(
uid
),
&
c
);
if
(
ret
<
0
||
c
!=
0
)
{
ASSERT
(
0
);
return
-
1
;
}
ret
=
tdbDbcGet
(
pUidIdxc
,
NULL
,
NULL
,
&
pData
,
&
nData
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tver
=
*
(
int64_t
*
)
pData
;
tdbDbcDelete
(
pUidIdxc
);
tdbDbcClose
(
pUidIdxc
);
// search and get meta entry
tdbDbcOpen
(
pMeta
->
pTbDb
,
&
pTbDbc
,
&
pMeta
->
txn
);
ret
=
tdbDbcMoveTo
(
pTbDbc
,
&
(
STbDbKey
){.
uid
=
uid
,
.
version
=
tver
},
sizeof
(
STbDbKey
),
&
c
);
if
(
ret
<
0
||
c
!=
0
)
{
ASSERT
(
0
);
return
-
1
;
}
ret
=
tdbDbcGet
(
pTbDbc
,
NULL
,
NULL
,
&
pData
,
&
nData
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
// decode entry
void
*
pDataCopy
=
taosMemoryMalloc
(
nData
);
// remove the copy (todo)
memcpy
(
pDataCopy
,
pData
,
nData
);
tCoderInit
(
&
coder
,
TD_LITTLE_ENDIAN
,
pDataCopy
,
nData
,
TD_DECODER
);
ret
=
metaDecodeEntry
(
&
coder
,
&
me
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
type
=
me
.
type
;
if
(
type
==
TSDB_CHILD_TABLE
)
{
ctime
=
me
.
ctbEntry
.
ctime
;
suid
=
me
.
ctbEntry
.
suid
;
}
else
if
(
type
==
TSDB_NORMAL_TABLE
)
{
ctime
=
me
.
ntbEntry
.
ctime
;
suid
=
0
;
}
else
{
ASSERT
(
0
);
}
taosMemoryFree
(
pDataCopy
);
tCoderClear
(
&
coder
);
tdbDbcClose
(
pTbDbc
);
if
(
type
==
TSDB_CHILD_TABLE
)
{
// remove the pCtbIdx
TDBC
*
pCtbIdxc
=
NULL
;
tdbDbcOpen
(
pMeta
->
pCtbIdx
,
&
pCtbIdxc
,
&
pMeta
->
txn
);
ret
=
tdbDbcMoveTo
(
pCtbIdxc
,
&
(
SCtbIdxKey
){.
suid
=
suid
,
.
uid
=
uid
},
sizeof
(
SCtbIdxKey
),
&
c
);
if
(
ret
<
0
||
c
!=
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tdbDbcDelete
(
pCtbIdxc
);
tdbDbcClose
(
pCtbIdxc
);
// remove tags from pTagIdx (todo)
}
else
if
(
type
==
TSDB_NORMAL_TABLE
)
{
// remove from pSkmDb
}
else
{
ASSERT
(
0
);
}
// remove from ttl (todo)
if
(
ctime
>
0
)
{
}
return
0
;
return
0
;
}
}
...
@@ -218,7 +339,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -218,7 +339,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tCoderClear
(
&
coder
);
tCoderClear
(
&
coder
);
// write to table.db
// write to table.db
if
(
tdbDb
Pu
t
(
pMeta
->
pTbDb
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMeta
->
txn
)
<
0
)
{
if
(
tdbDb
Inser
t
(
pMeta
->
pTbDb
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMeta
->
txn
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
...
@@ -231,11 +352,11 @@ _err:
...
@@ -231,11 +352,11 @@ _err:
}
}
static
int
metaUpdateUidIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
static
int
metaUpdateUidIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
return
tdbDb
Pu
t
(
pMeta
->
pUidIdx
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
&
pME
->
version
,
sizeof
(
int64_t
),
&
pMeta
->
txn
);
return
tdbDb
Inser
t
(
pMeta
->
pUidIdx
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
&
pME
->
version
,
sizeof
(
int64_t
),
&
pMeta
->
txn
);
}
}
static
int
metaUpdateNameIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
static
int
metaUpdateNameIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
return
tdbDb
Pu
t
(
pMeta
->
pNameIdx
,
pME
->
name
,
strlen
(
pME
->
name
)
+
1
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
return
tdbDb
Inser
t
(
pMeta
->
pNameIdx
,
pME
->
name
,
strlen
(
pME
->
name
)
+
1
,
&
pME
->
uid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
}
}
static
int
metaUpdateTtlIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
static
int
metaUpdateTtlIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
...
@@ -258,12 +379,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -258,12 +379,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
ttlKey
.
dtime
=
ctime
+
ttlDays
*
24
*
60
*
60
;
ttlKey
.
dtime
=
ctime
+
ttlDays
*
24
*
60
*
60
;
ttlKey
.
uid
=
pME
->
uid
;
ttlKey
.
uid
=
pME
->
uid
;
return
tdbDb
Pu
t
(
pMeta
->
pTtlIdx
,
&
ttlKey
,
sizeof
(
ttlKey
),
NULL
,
0
,
&
pMeta
->
txn
);
return
tdbDb
Inser
t
(
pMeta
->
pTtlIdx
,
&
ttlKey
,
sizeof
(
ttlKey
),
NULL
,
0
,
&
pMeta
->
txn
);
}
}
static
int
metaUpdateCtbIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
static
int
metaUpdateCtbIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
SCtbIdxKey
ctbIdxKey
=
{.
suid
=
pME
->
ctbEntry
.
suid
,
.
uid
=
pME
->
uid
};
SCtbIdxKey
ctbIdxKey
=
{.
suid
=
pME
->
ctbEntry
.
suid
,
.
uid
=
pME
->
uid
};
return
tdbDb
Pu
t
(
pMeta
->
pCtbIdx
,
&
ctbIdxKey
,
sizeof
(
ctbIdxKey
),
NULL
,
0
,
&
pMeta
->
txn
);
return
tdbDb
Inser
t
(
pMeta
->
pCtbIdx
,
&
ctbIdxKey
,
sizeof
(
ctbIdxKey
),
NULL
,
0
,
&
pMeta
->
txn
);
}
}
static
int
metaUpdateTagIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
static
int
metaUpdateTagIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
...
@@ -304,7 +425,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -304,7 +425,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
tCoderInit
(
&
coder
,
TD_LITTLE_ENDIAN
,
pVal
,
vLen
,
TD_ENCODER
);
tCoderInit
(
&
coder
,
TD_LITTLE_ENDIAN
,
pVal
,
vLen
,
TD_ENCODER
);
tEncodeSSchemaWrapper
(
&
coder
,
pSW
);
tEncodeSSchemaWrapper
(
&
coder
,
pSW
);
if
(
tdbDb
Pu
t
(
pMeta
->
pSkmDb
,
&
skmDbKey
,
sizeof
(
skmDbKey
),
pVal
,
vLen
,
&
pMeta
->
txn
)
<
0
)
{
if
(
tdbDb
Inser
t
(
pMeta
->
pSkmDb
,
&
skmDbKey
,
sizeof
(
skmDbKey
),
pVal
,
vLen
,
&
pMeta
->
txn
)
<
0
)
{
rcode
=
-
1
;
rcode
=
-
1
;
goto
_exit
;
goto
_exit
;
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
浏览文件 @
abcdeb84
...
@@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) {
...
@@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) {
int32_t
tsdbSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
pKey
,
int32_t
keyLen
,
void
*
pVal
,
int32_t
valLen
,
TXN
*
txn
)
{
int32_t
tsdbSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
pKey
,
int32_t
keyLen
,
void
*
pVal
,
int32_t
valLen
,
TXN
*
txn
)
{
int32_t
ret
;
int32_t
ret
;
ret
=
tdbDb
Pu
t
(
pDBF
->
pDB
,
pKey
,
keyLen
,
pVal
,
valLen
,
txn
);
ret
=
tdbDb
Inser
t
(
pDBF
->
pDB
,
pKey
,
keyLen
,
pVal
,
valLen
,
txn
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
tsdbError
(
"Failed to create insert sma data into db, ret = %d"
,
ret
);
tsdbError
(
"Failed to create insert sma data into db, ret = %d"
,
ret
);
return
-
1
;
return
-
1
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
abcdeb84
...
@@ -445,14 +445,45 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcM
...
@@ -445,14 +445,45 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcM
}
}
static
int
vnodeProcessDropTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
static
int
vnodeProcessDropTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SVDropTbReq
req
=
{
0
};
SVDropTbBatchReq
req
=
{
0
};
SVDropTbReq
rsp
=
{
0
};
SVDropTbBatchRsp
rsp
=
{
0
};
SCoder
coder
=
{
0
};
int
ret
;
pRsp
->
msgType
=
TDMT_VND_CREATE_STB_RSP
;
pRsp
->
pCont
=
NULL
;
pRsp
->
contLen
=
0
;
pRsp
->
code
=
TSDB_CODE_SUCCESS
;
// decode req
// decode req
tCoderInit
(
&
coder
,
TD_LITTLE_ENDIAN
,
pReq
,
len
,
TD_DECODER
);
ret
=
tDecodeSVDropTbBatchReq
(
&
coder
,
&
req
);
if
(
ret
<
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
pRsp
->
code
=
terrno
;
goto
_exit
;
}
// process req
// process req
rsp
.
pArray
=
taosArrayInit
(
sizeof
(
SVDropTbRsp
),
req
.
nReqs
);
for
(
int
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
SVDropTbReq
*
pDropTbReq
=
req
.
pReqs
+
iReq
;
SVDropTbRsp
dropTbRsp
=
{
0
};
// return rsp
/* code */
ret
=
metaDropTable
(
pVnode
->
pMeta
,
version
,
pDropTbReq
);
if
(
ret
<
0
)
{
dropTbRsp
.
code
=
TSDB_CODE_SUCCESS
;
}
else
{
dropTbRsp
.
code
=
terrno
;
}
taosArrayPush
(
rsp
.
pArray
,
&
dropTbRsp
);
}
_exit:
tCoderClear
(
&
coder
);
// encode rsp (TODO)
return
0
;
return
0
;
}
}
...
@@ -482,7 +513,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
...
@@ -482,7 +513,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
}
}
int32_t
tsdbProcessSubmitReq
(
STsdb
*
pTsdb
,
int64_t
version
,
void
*
pReq
)
{
int32_t
tsdbProcessSubmitReq
(
STsdb
*
pTsdb
,
int64_t
version
,
void
*
pReq
)
{
if
(
!
pReq
)
{
if
(
!
pReq
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
abcdeb84
...
@@ -4830,6 +4830,7 @@ static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
...
@@ -4830,6 +4830,7 @@ static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
static
SArray
*
createIndexMap
(
SNodeList
*
pNodeList
);
static
SArray
*
createIndexMap
(
SNodeList
*
pNodeList
);
static
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
static
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
);
static
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
);
static
void
setJoinColumnInfo
(
SColumnInfo
*
pInfo
,
const
SColumnNode
*
pLeftNode
);
static
SInterval
extractIntervalInfo
(
const
STableScanPhysiNode
*
pTableScanNode
)
{
static
SInterval
extractIntervalInfo
(
const
STableScanPhysiNode
*
pTableScanNode
)
{
SInterval
interval
=
{
SInterval
interval
=
{
...
@@ -5624,25 +5625,29 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
...
@@ -5624,25 +5625,29 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
goto
_error
;
goto
_error
;
}
}
pOperator
->
resultInfo
.
capacity
=
4096
;
initResultSizeInfo
(
pOperator
,
4096
);
pOperator
->
resultInfo
.
threshold
=
4096
*
0
.
75
;
// initResultRowInf
// o(&pInfo->binfo.resultRowInfo, 8);
pInfo
->
pRes
=
pResBlock
;
pOperator
->
name
=
"JoinOperator"
;
pInfo
->
pRes
=
pResBlock
;
pOperator
->
name
=
"MergeJoinOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_JOIN
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_JOIN
;
pOperator
->
blocking
=
false
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfOutput
=
numOfCols
;
pOperator
->
numOfOutput
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
SOperatorNode
*
pNode
=
(
SOperatorNode
*
)
pOnCondition
;
setJoinColumnInfo
(
&
pInfo
->
leftCol
,
(
SColumnNode
*
)
pNode
->
pLeft
);
setJoinColumnInfo
(
&
pInfo
->
rightCol
,
(
SColumnNode
*
)
pNode
->
pRight
);
pOperator
->
fpSet
=
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doMergeJoin
,
NULL
,
NULL
,
destroyBasicOperatorInfo
,
NULL
,
NULL
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doMergeJoin
,
NULL
,
NULL
,
destroyBasicOperatorInfo
,
NULL
,
NULL
,
NULL
);
int32_t
code
=
appendDownstream
(
pOperator
,
pDownstream
,
numOfDownstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
pDownstream
,
numOfDownstream
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
pOperator
;
return
pOperator
;
_error:
_error:
...
@@ -5651,3 +5656,11 @@ _error:
...
@@ -5651,3 +5656,11 @@ _error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
void
setJoinColumnInfo
(
SColumnInfo
*
pColumn
,
const
SColumnNode
*
pColumnNode
)
{
pColumn
->
slotId
=
pColumnNode
->
slotId
;
pColumn
->
type
=
pColumnNode
->
node
.
resType
.
type
;
pColumn
->
bytes
=
pColumnNode
->
node
.
resType
.
bytes
;
pColumn
->
precision
=
pColumnNode
->
node
.
resType
.
precision
;
pColumn
->
scale
=
pColumnNode
->
node
.
resType
.
scale
;
}
source/libs/executor/src/indexoperator.c
浏览文件 @
abcdeb84
...
@@ -63,9 +63,14 @@ typedef struct SIFParam {
...
@@ -63,9 +63,14 @@ typedef struct SIFParam {
}
SIFParam
;
}
SIFParam
;
static
int32_t
sifGetFuncFromSql
(
EOperatorType
src
,
EIndexQueryType
*
dst
)
{
static
int32_t
sifGetFuncFromSql
(
EOperatorType
src
,
EIndexQueryType
*
dst
)
{
if
(
src
==
OP_TYPE_GREATER_THAN
||
src
==
OP_TYPE_GREATER_EQUAL
||
src
==
OP_TYPE_LOWER_THAN
||
if
(
src
==
OP_TYPE_GREATER_THAN
)
{
src
==
OP_TYPE_LOWER_EQUAL
)
{
*
dst
=
QUERY_GREATER_THAN
;
*
dst
=
QUERY_RANGE
;
}
else
if
(
src
==
OP_TYPE_GREATER_EQUAL
)
{
*
dst
=
QUERY_GREATER_EQUAL
;
}
else
if
(
src
==
OP_TYPE_LOWER_THAN
)
{
*
dst
=
QUERY_LESS_THAN
;
}
else
if
(
src
==
OP_TYPE_LOWER_EQUAL
)
{
*
dst
=
QUERY_LESS_EQUAL
;
}
else
if
(
src
==
OP_TYPE_EQUAL
)
{
}
else
if
(
src
==
OP_TYPE_EQUAL
)
{
*
dst
=
QUERY_TERM
;
*
dst
=
QUERY_TERM
;
}
else
if
(
src
==
OP_TYPE_LIKE
||
src
==
OP_TYPE_MATCH
||
src
==
OP_TYPE_NMATCH
)
{
}
else
if
(
src
==
OP_TYPE_LIKE
||
src
==
OP_TYPE_MATCH
||
src
==
OP_TYPE_NMATCH
)
{
...
@@ -249,9 +254,6 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
...
@@ -249,9 +254,6 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
static
int32_t
sifDoIndex
(
SIFParam
*
left
,
SIFParam
*
right
,
int8_t
operType
,
SIFParam
*
output
)
{
static
int32_t
sifDoIndex
(
SIFParam
*
left
,
SIFParam
*
right
,
int8_t
operType
,
SIFParam
*
output
)
{
SIndexTerm
*
tm
=
indexTermCreate
(
left
->
suid
,
DEFAULT
,
operType
,
left
->
colValType
,
left
->
colName
,
SIndexTerm
*
tm
=
indexTermCreate
(
left
->
suid
,
DEFAULT
,
operType
,
left
->
colValType
,
left
->
colName
,
strlen
(
left
->
colName
),
right
->
condValue
,
strlen
(
right
->
condValue
));
strlen
(
left
->
colName
),
right
->
condValue
,
strlen
(
right
->
condValue
));
if
(
operType
==
OP_TYPE_LOWER_EQUAL
||
operType
==
OP_TYPE_GREATER_EQUAL
||
operType
==
OP_TYPE_GREATER_THAN
||
operType
==
OP_TYPE_LOWER_THAN
)
{
}
if
(
tm
==
NULL
)
{
if
(
tm
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
...
...
source/libs/index/inc/indexFst.h
浏览文件 @
abcdeb84
...
@@ -52,7 +52,6 @@ typedef struct FstRange {
...
@@ -52,7 +52,6 @@ typedef struct FstRange {
uint64_t
end
;
uint64_t
end
;
}
FstRange
;
}
FstRange
;
typedef
enum
{
GE
,
GT
,
LE
,
LT
}
RangeType
;
typedef
enum
{
OneTransNext
,
OneTrans
,
AnyTrans
,
EmptyFinal
}
State
;
typedef
enum
{
OneTransNext
,
OneTrans
,
AnyTrans
,
EmptyFinal
}
State
;
typedef
enum
{
Ordered
,
OutOfOrdered
,
DuplicateKey
}
OrderType
;
typedef
enum
{
Ordered
,
OutOfOrdered
,
DuplicateKey
}
OrderType
;
...
@@ -174,9 +173,9 @@ Output fstStateFinalOutput(FstState* state, uint64_t version, FstSlice* date,
...
@@ -174,9 +173,9 @@ Output fstStateFinalOutput(FstState* state, uint64_t version, FstSlice* date,
uint64_t
fstStateFindInput
(
FstState
*
state
,
FstNode
*
node
,
uint8_t
b
,
bool
*
null
);
uint64_t
fstStateFindInput
(
FstState
*
state
,
FstNode
*
node
,
uint8_t
b
,
bool
*
null
);
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans)
#define FST_STATE_ONE_TRNAS(node)
(node->state.state == OneTrans)
#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans)
#define FST_STATE_ANY_TRANS(node)
(node->state.state == AnyTrans)
#define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal)
#define FST_STATE_EMPTY_FINAL(node)
(node->state.state == EmptyFinal)
typedef
struct
FstLastTransition
{
typedef
struct
FstLastTransition
{
uint8_t
inp
;
uint8_t
inp
;
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
abcdeb84
...
@@ -34,6 +34,7 @@
...
@@ -34,6 +34,7 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
typedef
enum
{
LT
,
LE
,
GT
,
GE
}
RangeType
;
typedef
enum
{
kTypeValue
,
kTypeDeletion
}
STermValueType
;
typedef
enum
{
kTypeValue
,
kTypeDeletion
}
STermValueType
;
typedef
struct
SIndexStat
{
typedef
struct
SIndexStat
{
...
@@ -86,7 +87,6 @@ typedef struct SIndexTerm {
...
@@ -86,7 +87,6 @@ typedef struct SIndexTerm {
int32_t
nColName
;
int32_t
nColName
;
char
*
colVal
;
char
*
colVal
;
int32_t
nColVal
;
int32_t
nColVal
;
int8_t
qType
;
// just use for range
}
SIndexTerm
;
}
SIndexTerm
;
typedef
struct
SIndexTermQuery
{
typedef
struct
SIndexTermQuery
{
...
...
source/libs/index/src/index.c
浏览文件 @
abcdeb84
...
@@ -262,7 +262,6 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryT
...
@@ -262,7 +262,6 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryT
tm
->
colVal
=
(
char
*
)
taosMemoryCalloc
(
1
,
nColVal
+
1
);
tm
->
colVal
=
(
char
*
)
taosMemoryCalloc
(
1
,
nColVal
+
1
);
memcpy
(
tm
->
colVal
,
colVal
,
nColVal
);
memcpy
(
tm
->
colVal
,
colVal
,
nColVal
);
tm
->
nColVal
=
nColVal
;
tm
->
nColVal
=
nColVal
;
tm
->
qType
=
queryType
;
return
tm
;
return
tm
;
}
}
...
...
source/libs/index/src/indexCache.c
浏览文件 @
abcdeb84
...
@@ -38,10 +38,29 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S
...
@@ -38,10 +38,29 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S
static
int32_t
cacheSearchPrefix
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchPrefix
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchSuffix
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchSuffix
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchRegex
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchRegex
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchLessThan
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchLessEqual
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchGreaterThan
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchGreaterEqual
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchRange
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
static
int32_t
cacheSearchRange
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
);
/*comm func of compare, used in (LE/LT/GE/GT compare)*/
static
int32_t
cacheSearchCompareFunc
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
,
RangeType
type
);
typedef
enum
{
MATCH
,
CONTINUE
,
BREAK
}
TExeCond
;
typedef
TExeCond
(
*
_cache_range_compare
)(
void
*
a
,
void
*
b
,
int8_t
type
);
static
TExeCond
tCompareLessThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
return
MATCH
;
}
static
TExeCond
tCompareLessEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
return
MATCH
;
}
static
TExeCond
tCompareGreaterThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
return
MATCH
;
}
static
TExeCond
tCompareGreaterEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
return
MATCH
;
}
static
TExeCond
(
*
rangeCompare
[])(
void
*
a
,
void
*
b
,
int8_t
type
)
=
{
tCompareLessThan
,
tCompareLessEqual
,
tCompareGreaterThan
,
tCompareGreaterEqual
};
static
int32_t
(
*
cacheSearch
[])(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
)
=
{
static
int32_t
(
*
cacheSearch
[])(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
)
=
{
cacheSearchTerm
,
cacheSearchPrefix
,
cacheSearchSuffix
,
cacheSearchRegex
,
cacheSearchRange
};
cacheSearchTerm
,
cacheSearchPrefix
,
cacheSearchSuffix
,
cacheSearchRegex
,
cacheSearchLessThan
,
cacheSearchLessEqual
,
cacheSearchGreaterThan
,
cacheSearchGreaterEqual
,
cacheSearchRange
};
static
void
doMergeWork
(
SSchedMsg
*
msg
);
static
void
doMergeWork
(
SSchedMsg
*
msg
);
static
bool
indexCacheIteratorNext
(
Iterate
*
itera
);
static
bool
indexCacheIteratorNext
(
Iterate
*
itera
);
...
@@ -88,6 +107,52 @@ static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr,
...
@@ -88,6 +107,52 @@ static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr,
// impl later
// impl later
return
0
;
return
0
;
}
}
static
int32_t
cacheSearchCompareFunc
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
,
RangeType
type
)
{
if
(
cache
==
NULL
)
{
return
0
;
}
_cache_range_compare
cmpFn
=
rangeCompare
[
type
];
MemTable
*
mem
=
cache
;
char
*
key
=
indexCacheTermGet
(
ct
);
SSkipListIterator
*
iter
=
tSkipListCreateIter
(
mem
->
mem
);
while
(
tSkipListIterNext
(
iter
))
{
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
if
(
node
==
NULL
)
{
break
;
}
CacheTerm
*
c
=
(
CacheTerm
*
)
SL_GET_NODE_DATA
(
node
);
TExeCond
cond
=
cmpFn
(
c
->
colVal
,
ct
->
colVal
,
ct
->
colType
);
if
(
cond
==
MATCH
)
{
if
(
c
->
operaType
==
ADD_VALUE
)
{
INDEX_MERGE_ADD_DEL
(
tr
->
deled
,
tr
->
added
,
c
->
uid
)
// taosArrayPush(result, &c->uid);
*
s
=
kTypeValue
;
}
else
if
(
c
->
operaType
==
DEL_VALUE
)
{
INDEX_MERGE_ADD_DEL
(
tr
->
added
,
tr
->
deled
,
c
->
uid
)
}
}
else
if
(
cond
==
CONTINUE
)
{
}
else
if
(
cond
==
BREAK
)
{
break
;
}
}
tSkipListDestroyIter
(
iter
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
cacheSearchLessThan
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
)
{
return
cacheSearchCompareFunc
(
cache
,
ct
,
tr
,
s
,
LT
);
}
static
int32_t
cacheSearchLessEqual
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
)
{
return
cacheSearchCompareFunc
(
cache
,
ct
,
tr
,
s
,
LE
);
}
static
int32_t
cacheSearchGreaterThan
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
)
{
return
cacheSearchCompareFunc
(
cache
,
ct
,
tr
,
s
,
GT
);
}
static
int32_t
cacheSearchGreaterEqual
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
)
{
return
cacheSearchCompareFunc
(
cache
,
ct
,
tr
,
s
,
GE
);
}
static
int32_t
cacheSearchRange
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
)
{
static
int32_t
cacheSearchRange
(
void
*
cache
,
CacheTerm
*
ct
,
SIdxTempResult
*
tr
,
STermValueType
*
s
)
{
// impl later
// impl later
return
0
;
return
0
;
...
...
source/libs/index/src/indexTfile.c
浏览文件 @
abcdeb84
...
@@ -64,10 +64,17 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
...
@@ -64,10 +64,17 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static
int32_t
tfSearchPrefix
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchPrefix
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchSuffix
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchSuffix
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchRegex
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchRegex
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchLessThan
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchLessEqual
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchGreaterThan
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchGreaterEqual
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchRange
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchRange
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
);
static
int32_t
tfSearchCompareFunc
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
,
RangeType
ctype
);
static
int32_t
(
*
tfSearch
[])(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
)
=
{
static
int32_t
(
*
tfSearch
[])(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
)
=
{
tfSearchTerm
,
tfSearchPrefix
,
tfSearchSuffix
,
tfSearchRegex
,
tfSearchRange
};
tfSearchTerm
,
tfSearchPrefix
,
tfSearchSuffix
,
tfSearchRegex
,
tfSearchLessThan
,
tfSearchLessEqual
,
tfSearchGreaterThan
,
tfSearchGreaterEqual
,
tfSearchRange
};
TFileCache
*
tfileCacheCreate
(
const
char
*
path
)
{
TFileCache
*
tfileCacheCreate
(
const
char
*
path
)
{
TFileCache
*
tcache
=
taosMemoryCalloc
(
1
,
sizeof
(
TFileCache
));
TFileCache
*
tcache
=
taosMemoryCalloc
(
1
,
sizeof
(
TFileCache
));
...
@@ -299,6 +306,47 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
...
@@ -299,6 +306,47 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
fstSliceDestroy
(
&
key
);
fstSliceDestroy
(
&
key
);
return
0
;
return
0
;
}
}
static
int32_t
tfSearchCompareFunc
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
,
RangeType
type
)
{
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
tem
->
colType
,
TSDB_DATA_TYPE_JSON
);
int
ret
=
0
;
char
*
p
=
tem
->
colVal
;
uint64_t
sz
=
tem
->
nColVal
;
if
(
hasJson
)
{
p
=
indexPackJsonData
(
tem
);
sz
=
strlen
(
p
);
}
SArray
*
offsets
=
taosArrayInit
(
16
,
sizeof
(
uint64_t
));
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
p
,
AUTOMATION_ALWAYS
);
FstStreamBuilder
*
sb
=
fstSearch
(((
TFileReader
*
)
reader
)
->
fst
,
ctx
);
FstSlice
h
=
fstSliceCreate
((
uint8_t
*
)
p
,
sz
);
fstStreamBuilderSetRange
(
sb
,
&
h
,
type
);
fstSliceDestroy
(
&
h
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
taosArrayPush
(
offsets
,
&
(
rt
->
out
.
out
));
swsResultDestroy
(
rt
);
}
streamWithStateDestroy
(
st
);
fstStreamBuilderDestroy
(
sb
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tfSearchLessThan
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
)
{
return
tfSearchCompareFunc
(
reader
,
tem
,
tr
,
LT
);
}
static
int32_t
tfSearchLessEqual
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
)
{
return
tfSearchCompareFunc
(
reader
,
tem
,
tr
,
LE
);
}
static
int32_t
tfSearchGreaterThan
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
)
{
return
tfSearchCompareFunc
(
reader
,
tem
,
tr
,
GT
);
}
static
int32_t
tfSearchGreaterEqual
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
)
{
return
tfSearchCompareFunc
(
reader
,
tem
,
tr
,
GE
);
}
static
int32_t
tfSearchRange
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
)
{
static
int32_t
tfSearchRange
(
void
*
reader
,
SIndexTerm
*
tem
,
SIdxTempResult
*
tr
)
{
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
tem
->
colType
,
TSDB_DATA_TYPE_JSON
);
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
tem
->
colType
,
TSDB_DATA_TYPE_JSON
);
int
ret
=
0
;
int
ret
=
0
;
...
...
source/libs/scalar/src/sclvector.c
浏览文件 @
abcdeb84
...
@@ -297,6 +297,22 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn
...
@@ -297,6 +297,22 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn
taosMemoryFree
(
t
);
taosMemoryFree
(
t
);
}
}
static
FORCE_INLINE
void
ncharToVar
(
char
*
buf
,
SScalarParam
*
pOut
,
int32_t
rowIndex
)
{
int32_t
inputLen
=
varDataLen
(
buf
);
char
*
t
=
taosMemoryCalloc
(
1
,
inputLen
+
VARSTR_HEADER_SIZE
);
int32_t
len
=
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
buf
),
varDataLen
(
buf
),
varDataVal
(
t
));
if
(
len
<
0
)
{
taosMemoryFree
(
t
);
return
;
}
varDataSetLen
(
t
,
len
);
colDataAppend
(
pOut
->
columnData
,
rowIndex
,
t
,
false
);
taosMemoryFree
(
t
);
}
//TODO opt performance, tmp is not needed.
//TODO opt performance, tmp is not needed.
int32_t
vectorConvertFromVarData
(
const
SScalarParam
*
pIn
,
SScalarParam
*
pOut
,
int32_t
inType
,
int32_t
outType
)
{
int32_t
vectorConvertFromVarData
(
const
SScalarParam
*
pIn
,
SScalarParam
*
pOut
,
int32_t
inType
,
int32_t
outType
)
{
int32_t
bufSize
=
pIn
->
columnData
->
info
.
bytes
;
int32_t
bufSize
=
pIn
->
columnData
->
info
.
bytes
;
...
@@ -313,6 +329,10 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
...
@@ -313,6 +329,10 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
func
=
varToUnsigned
;
func
=
varToUnsigned
;
}
else
if
(
IS_FLOAT_TYPE
(
outType
))
{
}
else
if
(
IS_FLOAT_TYPE
(
outType
))
{
func
=
varToFloat
;
func
=
varToFloat
;
}
else
if
(
outType
==
TSDB_DATA_TYPE_BINARY
)
{
// nchar -> binary
ASSERT
(
inType
==
TSDB_DATA_TYPE_NCHAR
);
func
=
ncharToVar
;
vton
=
true
;
}
else
if
(
outType
==
TSDB_DATA_TYPE_NCHAR
)
{
// binary -> nchar
}
else
if
(
outType
==
TSDB_DATA_TYPE_NCHAR
)
{
// binary -> nchar
ASSERT
(
inType
==
TSDB_DATA_TYPE_VARCHAR
);
ASSERT
(
inType
==
TSDB_DATA_TYPE_VARCHAR
);
func
=
varToNchar
;
func
=
varToNchar
;
...
@@ -608,7 +628,7 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
...
@@ -608,7 +628,7 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
/*BIGI*/
0
,
0
,
0
,
0
,
0
,
0
,
6
,
7
,
7
,
0
,
7
,
5
,
5
,
5
,
7
,
0
,
7
,
0
,
0
,
/*BIGI*/
0
,
0
,
0
,
0
,
0
,
0
,
6
,
7
,
7
,
0
,
7
,
5
,
5
,
5
,
7
,
0
,
7
,
0
,
0
,
/*FLOA*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
7
,
7
,
6
,
7
,
6
,
6
,
6
,
6
,
0
,
7
,
0
,
0
,
/*FLOA*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
7
,
7
,
6
,
7
,
6
,
6
,
6
,
6
,
0
,
7
,
0
,
0
,
/*DOUB*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
7
,
7
,
7
,
7
,
7
,
7
,
7
,
0
,
7
,
0
,
0
,
/*DOUB*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
7
,
7
,
7
,
7
,
7
,
7
,
7
,
0
,
7
,
0
,
0
,
/*VARC*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
9
,
0
,
7
,
7
,
7
,
7
,
0
,
0
,
0
,
0
,
/*VARC*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
9
,
8
,
7
,
7
,
7
,
7
,
0
,
0
,
0
,
0
,
/*TIME*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
9
,
9
,
9
,
9
,
7
,
0
,
7
,
0
,
0
,
/*TIME*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
9
,
9
,
9
,
9
,
7
,
0
,
7
,
0
,
0
,
/*NCHA*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
7
,
7
,
7
,
7
,
0
,
0
,
0
,
0
,
/*NCHA*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
7
,
7
,
7
,
7
,
0
,
0
,
0
,
0
,
/*UTIN*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
12
,
13
,
14
,
0
,
7
,
0
,
0
,
/*UTIN*/
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
12
,
13
,
14
,
0
,
7
,
0
,
0
,
...
...
source/libs/tdb/inc/tdb.h
浏览文件 @
abcdeb84
...
@@ -40,7 +40,9 @@ int tdbCommit(TENV *pEnv, TXN *pTxn);
...
@@ -40,7 +40,9 @@ int tdbCommit(TENV *pEnv, TXN *pTxn);
int
tdbDbOpen
(
const
char
*
fname
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TENV
*
pEnv
,
TDB
**
ppDb
);
int
tdbDbOpen
(
const
char
*
fname
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TENV
*
pEnv
,
TDB
**
ppDb
);
int
tdbDbClose
(
TDB
*
pDb
);
int
tdbDbClose
(
TDB
*
pDb
);
int
tdbDbDrop
(
TDB
*
pDb
);
int
tdbDbDrop
(
TDB
*
pDb
);
int
tdbDbPut
(
TDB
*
pDb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
);
int
tdbDbInsert
(
TDB
*
pDb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
);
int
tdbDbDelete
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
TXN
*
pTxn
);
int
tdbDbUpsert
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
,
TXN
*
pTxn
);
int
tdbDbGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbDbGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbDbPGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppKey
,
int
*
pkLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbDbPGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppKey
,
int
*
pkLen
,
void
**
ppVal
,
int
*
vLen
);
...
@@ -53,11 +55,9 @@ int tdbDbcMoveToLast(TDBC *pDbc);
...
@@ -53,11 +55,9 @@ int tdbDbcMoveToLast(TDBC *pDbc);
int
tdbDbcMoveToNext
(
TDBC
*
pDbc
);
int
tdbDbcMoveToNext
(
TDBC
*
pDbc
);
int
tdbDbcMoveToPrev
(
TDBC
*
pDbc
);
int
tdbDbcMoveToPrev
(
TDBC
*
pDbc
);
int
tdbDbcGet
(
TDBC
*
pDbc
,
const
void
**
ppKey
,
int
*
pkLen
,
const
void
**
ppVal
,
int
*
pvLen
);
int
tdbDbcGet
(
TDBC
*
pDbc
,
const
void
**
ppKey
,
int
*
pkLen
,
const
void
**
ppVal
,
int
*
pvLen
);
int
tdbDbcDelete
(
TDBC
*
pDbc
);
int
tdbDbcPut
(
TDBC
*
pDbc
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
);
int
tdbDbcUpdate
(
TDBC
*
pDbc
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
);
int
tdbDbcDrop
(
TDBC
*
pDbc
);
int
tdbDbcNext
(
TDBC
*
pDbc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbDbcNext
(
TDBC
*
pDbc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbDbcUpsert
(
TDBC
*
pDbc
,
const
void
*
pKey
,
int
nKey
,
const
void
*
pData
,
int
nData
,
int
insert
);
// TXN
// TXN
#define TDB_TXN_WRITE 0x1
#define TDB_TXN_WRITE 0x1
...
...
source/libs/tdb/src/db/tdbBtree.c
浏览文件 @
abcdeb84
...
@@ -138,67 +138,90 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
...
@@ -138,67 +138,90 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
}
}
if
(
btc
.
idx
==
-
1
)
{
if
(
btc
.
idx
==
-
1
)
{
idx
=
0
;
btc
.
idx
=
0
;
}
else
{
}
else
{
if
(
c
>
0
)
{
if
(
c
>
0
)
{
idx
=
btc
.
idx
+
1
;
btc
.
idx
++
;
}
else
if
(
c
<
0
)
{
}
else
if
(
c
==
0
)
{
idx
=
btc
.
idx
;
// dup key not allowed
}
else
{
// TDB does NOT allow same key
tdbBtcClose
(
&
btc
);
ASSERT
(
0
);
ASSERT
(
0
);
return
-
1
;
return
-
1
;
}
}
}
}
// make sure enough space to hold the cell
ret
=
tdbBtcUpsert
(
&
btc
,
pKey
,
kLen
,
pVal
,
vLen
,
1
);
szBuf
=
kLen
+
vLen
+
14
;
if
(
ret
<
0
)
{
pBuf
=
tdbRealloc
(
pBt
->
pBuf
,
pBt
->
pageSize
>
szBuf
?
szBuf
:
pBt
->
pageSize
);
if
(
pBuf
==
NULL
)
{
tdbBtcClose
(
&
btc
);
ASSERT
(
0
);
ASSERT
(
0
);
tdbBtcClose
(
&
btc
);
return
-
1
;
return
-
1
;
}
}
pBt
->
pBuf
=
pBuf
;
pCell
=
(
SCell
*
)
pBt
->
pBuf
;
// encode cell
tdbBtcClose
(
&
btc
);
ret
=
tdbBtreeEncodeCell
(
btc
.
pPage
,
pKey
,
kLen
,
pVal
,
vLen
,
pCell
,
&
szCell
);
return
0
;
}
int
tdbBtreeDelete
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
TXN
*
pTxn
)
{
SBTC
btc
;
int
c
;
int
ret
;
tdbBtcOpen
(
&
btc
,
pBt
,
pTxn
);
// move the cursor
ret
=
tdbBtcMoveTo
(
&
btc
,
pKey
,
kLen
,
&
c
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
tdbBtcClose
(
&
btc
);
tdbBtcClose
(
&
btc
);
ASSERT
(
0
);
ASSERT
(
0
);
return
-
1
;
return
-
1
;
}
}
// mark the page dirty
if
(
btc
.
idx
<
0
||
c
!=
0
)
{
ret
=
tdbPagerWrite
(
pBt
->
pPager
,
btc
.
pPage
);
if
(
ret
<
0
)
{
tdbBtcClose
(
&
btc
);
tdbBtcClose
(
&
btc
);
ASSERT
(
0
);
return
-
1
;
return
-
1
;
}
}
// insert the cell
// delete the key
ret
=
tdbPageInsertCell
(
btc
.
pPage
,
idx
,
pCell
,
szCell
,
0
);
if
(
tdbBtcDelete
(
&
btc
)
<
0
)
{
if
(
ret
<
0
)
{
tdbBtcClose
(
&
btc
);
tdbBtcClose
(
&
btc
);
return
-
1
;
}
tdbBtcClose
(
&
btc
);
return
0
;
}
int
tdbBtreeUpsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
nKey
,
const
void
*
pData
,
int
nData
,
TXN
*
pTxn
)
{
SBTC
btc
;
int
c
;
int
ret
;
tdbBtcOpen
(
&
btc
,
pBt
,
pTxn
);
// move the cursor
ret
=
tdbBtcMoveTo
(
&
btc
,
pKey
,
nKey
,
&
c
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
ASSERT
(
0
);
tdbBtcClose
(
&
btc
);
return
-
1
;
return
-
1
;
}
}
// check if need balance
if
(
btc
.
idx
==
-
1
)
{
if
(
btc
.
pPage
->
nOverflow
>
0
)
{
btc
.
idx
=
0
;
ret
=
tdbBtreeBalance
(
&
btc
);
c
=
1
;
if
(
ret
<
0
)
{
}
else
{
tdbBtcClose
(
&
btc
);
if
(
c
>
0
)
{
ASSERT
(
0
);
btc
.
idx
=
btc
.
idx
+
1
;
return
-
1
;
}
}
}
}
tdbBtcClose
(
&
btc
);
ret
=
tdbBtcUpsert
(
&
btc
,
pKey
,
nKey
,
pData
,
nData
,
c
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
tdbBtcClose
(
&
btc
);
return
-
1
;
}
tdbBtcClose
(
&
btc
);
return
0
;
return
0
;
}
}
...
@@ -552,14 +575,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
...
@@ -552,14 +575,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
SCell
*
pCell
;
SCell
*
pCell
;
int
szLCell
,
szRCell
;
int
szLCell
,
szRCell
;
// balance page (iNew) and (iNew-1)
for
(;;)
{
for
(;;)
{
pCell
=
tdbPageGetCell
(
pOlds
[
infoNews
[
iNew
-
1
].
iPage
],
infoNews
[
iNew
-
1
].
oIdx
);
pCell
=
tdbPageGetCell
(
pOlds
[
infoNews
[
iNew
-
1
].
iPage
],
infoNews
[
iNew
-
1
].
oIdx
);
if
(
childNotLeaf
)
{
szLCell
=
tdbBtreeCellSize
(
pOlds
[
infoNews
[
iNew
-
1
].
iPage
],
pCell
);
szLCell
=
szRCell
=
tdbBtreeCellSize
(
pOlds
[
infoNews
[
iNew
-
1
].
iPage
],
pCell
);
if
(
!
childNotLeaf
)
{
szRCell
=
szLCell
;
}
else
{
}
else
{
szLCell
=
tdbBtreeCellSize
(
pOlds
[
infoNews
[
iNew
-
1
].
iPage
],
pCell
);
int
iPage
=
infoNews
[
iNew
-
1
].
iPage
;
int
iPage
=
infoNews
[
iNew
-
1
].
iPage
;
int
oIdx
=
infoNews
[
iNew
-
1
].
oIdx
+
1
;
int
oIdx
=
infoNews
[
iNew
-
1
].
oIdx
+
1
;
SPage
*
pPage
;
SPage
*
pPage
;
...
@@ -736,6 +759,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
...
@@ -736,6 +759,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
}
}
}
if
(
TDB_BTREE_PAGE_IS_ROOT
(
pParent
)
&&
TDB_PAGE_TOTAL_CELLS
(
pParent
)
==
0
)
{
i8
flags
=
TDB_BTREE_ROOT
|
TDB_BTREE_PAGE_IS_LEAF
(
pNews
[
0
]);
// copy content to the parent page
tdbBtreeInitPage
(
pParent
,
&
(
SBtreeInitPageArg
){.
flags
=
flags
,
.
pBt
=
pBt
},
0
);
tdbPageCopy
(
pNews
[
0
],
pParent
);
}
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
if
(
pDivCell
[
i
])
{
if
(
pDivCell
[
i
])
{
tdbOsFree
(
pDivCell
[
i
]);
tdbOsFree
(
pDivCell
[
i
]);
...
@@ -1357,7 +1387,143 @@ int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int
...
@@ -1357,7 +1387,143 @@ int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int
if
(
ppVal
)
{
if
(
ppVal
)
{
*
ppVal
=
(
void
*
)
pBtc
->
coder
.
pVal
;
*
ppVal
=
(
void
*
)
pBtc
->
coder
.
pVal
;
*
kLen
=
pBtc
->
coder
.
vLen
;
*
vLen
=
pBtc
->
coder
.
vLen
;
}
return
0
;
}
int
tdbBtcDelete
(
SBTC
*
pBtc
)
{
int
idx
=
pBtc
->
idx
;
int
nCells
=
TDB_PAGE_TOTAL_CELLS
(
pBtc
->
pPage
);
SPager
*
pPager
=
pBtc
->
pBt
->
pPager
;
const
void
*
pKey
;
i8
iPage
;
SPage
*
pPage
;
SPgno
pgno
;
SCell
*
pCell
;
int
szCell
;
int
nKey
;
int
ret
;
ASSERT
(
idx
>=
0
&&
idx
<
nCells
);
// drop the cell on the leaf
ret
=
tdbPagerWrite
(
pPager
,
pBtc
->
pPage
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tdbPageDropCell
(
pBtc
->
pPage
,
idx
);
// update interior page or do balance
if
(
idx
==
nCells
-
1
)
{
if
(
idx
)
{
pBtc
->
idx
--
;
tdbBtcGet
(
pBtc
,
&
pKey
,
&
nKey
,
NULL
,
NULL
);
// loop to update the interial page
pgno
=
TDB_PAGE_PGNO
(
pBtc
->
pPage
);
for
(
iPage
=
pBtc
->
iPage
-
1
;
iPage
>=
0
;
iPage
--
)
{
pPage
=
pBtc
->
pgStack
[
iPage
];
idx
=
pBtc
->
idxStack
[
iPage
];
nCells
=
TDB_PAGE_TOTAL_CELLS
(
pPage
);
if
(
idx
<
nCells
)
{
ret
=
tdbPagerWrite
(
pPager
,
pPage
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
// update the cell with new key
pCell
=
tdbOsMalloc
(
nKey
+
9
);
tdbBtreeEncodeCell
(
pPage
,
pKey
,
nKey
,
&
pgno
,
sizeof
(
pgno
),
pCell
,
&
szCell
);
ret
=
tdbPageUpdateCell
(
pPage
,
idx
,
pCell
,
szCell
);
if
(
ret
<
0
)
{
tdbOsFree
(
pCell
);
ASSERT
(
0
);
return
-
1
;
}
tdbOsFree
(
pCell
);
break
;
}
else
{
pgno
=
TDB_PAGE_PGNO
(
pPage
);
}
}
}
else
{
// delete the leaf page and do balance
ASSERT
(
TDB_PAGE_TOTAL_CELLS
(
pBtc
->
pPage
)
==
0
);
ret
=
tdbBtreeBalance
(
pBtc
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
}
return
0
;
}
int
tdbBtcUpsert
(
SBTC
*
pBtc
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pData
,
int
nData
,
int
insert
)
{
SCell
*
pCell
;
int
szCell
;
int
nCells
=
TDB_PAGE_TOTAL_CELLS
(
pBtc
->
pPage
);
int
szBuf
;
void
*
pBuf
;
int
ret
;
ASSERT
(
pBtc
->
idx
>=
0
);
// alloc space
szBuf
=
kLen
+
nData
+
14
;
pBuf
=
tdbRealloc
(
pBtc
->
pBt
->
pBuf
,
pBtc
->
pBt
->
pageSize
>
szBuf
?
szBuf
:
pBtc
->
pBt
->
pageSize
);
if
(
pBuf
==
NULL
)
{
ASSERT
(
0
);
return
-
1
;
}
pBtc
->
pBt
->
pBuf
=
pBuf
;
pCell
=
(
SCell
*
)
pBtc
->
pBt
->
pBuf
;
// encode cell
ret
=
tdbBtreeEncodeCell
(
pBtc
->
pPage
,
pKey
,
kLen
,
pData
,
nData
,
pCell
,
&
szCell
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
// mark dirty
ret
=
tdbPagerWrite
(
pBtc
->
pBt
->
pPager
,
pBtc
->
pPage
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
// insert or update
if
(
insert
)
{
ASSERT
(
pBtc
->
idx
<=
nCells
);
ret
=
tdbPageInsertCell
(
pBtc
->
pPage
,
pBtc
->
idx
,
pCell
,
szCell
,
0
);
}
else
{
ASSERT
(
pBtc
->
idx
<
nCells
);
ret
=
tdbPageUpdateCell
(
pBtc
->
pPage
,
pBtc
->
idx
,
pCell
,
szCell
);
}
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
// check balance
if
(
pBtc
->
pPage
->
nOverflow
>
0
)
{
ret
=
tdbBtreeBalance
(
pBtc
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
}
return
0
;
return
0
;
...
...
source/libs/tdb/src/db/tdbDb.c
浏览文件 @
abcdeb84
...
@@ -75,10 +75,16 @@ int tdbDbDrop(TDB *pDb) {
...
@@ -75,10 +75,16 @@ int tdbDbDrop(TDB *pDb) {
return
0
;
return
0
;
}
}
int
tdbDb
Pu
t
(
TDB
*
pDb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
)
{
int
tdbDb
Inser
t
(
TDB
*
pDb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
)
{
return
tdbBtreeInsert
(
pDb
->
pBt
,
pKey
,
keyLen
,
pVal
,
valLen
,
pTxn
);
return
tdbBtreeInsert
(
pDb
->
pBt
,
pKey
,
keyLen
,
pVal
,
valLen
,
pTxn
);
}
}
int
tdbDbDelete
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
TXN
*
pTxn
)
{
return
tdbBtreeDelete
(
pDb
->
pBt
,
pKey
,
kLen
,
pTxn
);
}
int
tdbDbUpsert
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
,
TXN
*
pTxn
)
{
return
tdbBtreeUpsert
(
pDb
->
pBt
,
pKey
,
kLen
,
pVal
,
vLen
,
pTxn
);
}
int
tdbDbGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
)
{
int
tdbDbGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
)
{
return
tdbBtreeGet
(
pDb
->
pBt
,
pKey
,
kLen
,
ppVal
,
vLen
);
return
tdbBtreeGet
(
pDb
->
pBt
,
pKey
,
kLen
,
ppVal
,
vLen
);
}
}
...
@@ -117,28 +123,16 @@ int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, in
...
@@ -117,28 +123,16 @@ int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, in
return
tdbBtcGet
(
&
pDbc
->
btc
,
ppKey
,
pkLen
,
ppVal
,
pvLen
);
return
tdbBtcGet
(
&
pDbc
->
btc
,
ppKey
,
pkLen
,
ppVal
,
pvLen
);
}
}
int
tdbDbcPut
(
TDBC
*
pDbc
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
)
{
int
tdbDbcDelete
(
TDBC
*
pDbc
)
{
return
tdbBtcDelete
(
&
pDbc
->
btc
);
}
// TODO
ASSERT
(
0
);
return
0
;
}
int
tdbDbcUpdate
(
TDBC
*
pDbc
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
)
{
// TODO
ASSERT
(
0
);
return
0
;
}
int
tdbDbcDrop
(
TDBC
*
pDbc
)
{
// TODO
ASSERT
(
0
);
return
0
;
}
int
tdbDbcNext
(
TDBC
*
pDbc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
)
{
int
tdbDbcNext
(
TDBC
*
pDbc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
)
{
return
tdbBtreeNext
(
&
pDbc
->
btc
,
ppKey
,
kLen
,
ppVal
,
vLen
);
return
tdbBtreeNext
(
&
pDbc
->
btc
,
ppKey
,
kLen
,
ppVal
,
vLen
);
}
}
int
tdbDbcUpsert
(
TDBC
*
pDbc
,
const
void
*
pKey
,
int
nKey
,
const
void
*
pData
,
int
nData
,
int
insert
)
{
return
tdbBtcUpsert
(
&
pDbc
->
btc
,
pKey
,
nKey
,
pData
,
nData
,
insert
);
}
int
tdbDbcClose
(
TDBC
*
pDbc
)
{
int
tdbDbcClose
(
TDBC
*
pDbc
)
{
if
(
pDbc
)
{
if
(
pDbc
)
{
tdbBtcClose
(
&
pDbc
->
btc
);
tdbBtcClose
(
&
pDbc
->
btc
);
...
...
source/libs/tdb/src/db/tdbPage.c
浏览文件 @
abcdeb84
...
@@ -171,6 +171,11 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
...
@@ -171,6 +171,11 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
return
0
;
return
0
;
}
}
int
tdbPageUpdateCell
(
SPage
*
pPage
,
int
idx
,
SCell
*
pCell
,
int
szCell
)
{
tdbPageDropCell
(
pPage
,
idx
);
return
tdbPageInsertCell
(
pPage
,
idx
,
pCell
,
szCell
,
0
);
}
int
tdbPageDropCell
(
SPage
*
pPage
,
int
idx
)
{
int
tdbPageDropCell
(
SPage
*
pPage
,
int
idx
)
{
int
lidx
;
int
lidx
;
SCell
*
pCell
;
SCell
*
pCell
;
...
...
source/libs/tdb/src/inc/tdbInt.h
浏览文件 @
abcdeb84
...
@@ -128,6 +128,8 @@ struct SBTC {
...
@@ -128,6 +128,8 @@ struct SBTC {
int
tdbBtreeOpen
(
int
keyLen
,
int
valLen
,
SPager
*
pFile
,
tdb_cmpr_fn_t
kcmpr
,
SBTree
**
ppBt
);
int
tdbBtreeOpen
(
int
keyLen
,
int
valLen
,
SPager
*
pFile
,
tdb_cmpr_fn_t
kcmpr
,
SBTree
**
ppBt
);
int
tdbBtreeClose
(
SBTree
*
pBt
);
int
tdbBtreeClose
(
SBTree
*
pBt
);
int
tdbBtreeInsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
,
TXN
*
pTxn
);
int
tdbBtreeInsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
,
TXN
*
pTxn
);
int
tdbBtreeDelete
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
TXN
*
pTxn
);
int
tdbBtreeUpsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
nKey
,
const
void
*
pData
,
int
nData
,
TXN
*
pTxn
);
int
tdbBtreeGet
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbBtreeGet
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbBtreePGet
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
void
**
ppKey
,
int
*
pkLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbBtreePGet
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
void
**
ppKey
,
int
*
pkLen
,
void
**
ppVal
,
int
*
vLen
);
...
@@ -141,6 +143,8 @@ int tdbBtcMoveToNext(SBTC *pBtc);
...
@@ -141,6 +143,8 @@ int tdbBtcMoveToNext(SBTC *pBtc);
int
tdbBtcMoveToPrev
(
SBTC
*
pBtc
);
int
tdbBtcMoveToPrev
(
SBTC
*
pBtc
);
int
tdbBtreeNext
(
SBTC
*
pBtc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbBtreeNext
(
SBTC
*
pBtc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbBtcGet
(
SBTC
*
pBtc
,
const
void
**
ppKey
,
int
*
kLen
,
const
void
**
ppVal
,
int
*
vLen
);
int
tdbBtcGet
(
SBTC
*
pBtc
,
const
void
**
ppKey
,
int
*
kLen
,
const
void
**
ppVal
,
int
*
vLen
);
int
tdbBtcDelete
(
SBTC
*
pBtc
);
int
tdbBtcUpsert
(
SBTC
*
pBtc
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pData
,
int
nData
,
int
insert
);
// tdbPager.c ====================================
// tdbPager.c ====================================
...
@@ -278,6 +282,7 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
...
@@ -278,6 +282,7 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
void
tdbPageInit
(
SPage
*
pPage
,
u8
szAmHdr
,
int
(
*
xCellSize
)(
const
SPage
*
,
SCell
*
));
void
tdbPageInit
(
SPage
*
pPage
,
u8
szAmHdr
,
int
(
*
xCellSize
)(
const
SPage
*
,
SCell
*
));
int
tdbPageInsertCell
(
SPage
*
pPage
,
int
idx
,
SCell
*
pCell
,
int
szCell
,
u8
asOvfl
);
int
tdbPageInsertCell
(
SPage
*
pPage
,
int
idx
,
SCell
*
pCell
,
int
szCell
,
u8
asOvfl
);
int
tdbPageDropCell
(
SPage
*
pPage
,
int
idx
);
int
tdbPageDropCell
(
SPage
*
pPage
,
int
idx
);
int
tdbPageUpdateCell
(
SPage
*
pPage
,
int
idx
,
SCell
*
pCell
,
int
szCell
);
void
tdbPageCopy
(
SPage
*
pFromPage
,
SPage
*
pToPage
);
void
tdbPageCopy
(
SPage
*
pFromPage
,
SPage
*
pToPage
);
int
tdbPageCapacity
(
int
pageSize
,
int
amHdrSize
);
int
tdbPageCapacity
(
int
pageSize
,
int
amHdrSize
);
...
...
source/libs/tdb/test/tdbTest.cpp
浏览文件 @
abcdeb84
...
@@ -115,12 +115,12 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
...
@@ -115,12 +115,12 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
return
cret
;
return
cret
;
}
}
TEST
(
tdb_test
,
simple_
test
)
{
TEST
(
tdb_test
,
simple_
insert1
)
{
int
ret
;
int
ret
;
TENV
*
pEnv
;
TENV
*
pEnv
;
TDB
*
pDb
;
TDB
*
pDb
;
tdb_cmpr_fn_t
compFunc
;
tdb_cmpr_fn_t
compFunc
;
int
nData
=
1000000
0
;
int
nData
=
1000000
;
TXN
txn
;
TXN
txn
;
taosRemoveDir
(
"tdb"
);
taosRemoveDir
(
"tdb"
);
...
@@ -152,7 +152,7 @@ TEST(tdb_test, simple_test) {
...
@@ -152,7 +152,7 @@ TEST(tdb_test, simple_test) {
for
(
int
iData
=
1
;
iData
<=
nData
;
iData
++
)
{
for
(
int
iData
=
1
;
iData
<=
nData
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
val
,
"value%d"
,
iData
);
sprintf
(
val
,
"value%d"
,
iData
);
ret
=
tdbDb
Pu
t
(
pDb
,
key
,
strlen
(
key
),
val
,
strlen
(
val
),
&
txn
);
ret
=
tdbDb
Inser
t
(
pDb
,
key
,
strlen
(
key
),
val
,
strlen
(
val
),
&
txn
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// if pool is full, commit the transaction and start a new one
// if pool is full, commit the transaction and start a new one
...
@@ -202,6 +202,8 @@ TEST(tdb_test, simple_test) {
...
@@ -202,6 +202,8 @@ TEST(tdb_test, simple_test) {
ret
=
tdbDbcOpen
(
pDb
,
&
pDBC
,
NULL
);
ret
=
tdbDbcOpen
(
pDb
,
&
pDBC
,
NULL
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
tdbDbcMoveToFirst
(
pDBC
);
for
(;;)
{
for
(;;)
{
ret
=
tdbDbcNext
(
pDBC
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
);
ret
=
tdbDbcNext
(
pDBC
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
);
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
break
;
...
@@ -233,7 +235,7 @@ TEST(tdb_test, simple_test) {
...
@@ -233,7 +235,7 @@ TEST(tdb_test, simple_test) {
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
}
}
TEST
(
tdb_test
,
simple_
tes
t2
)
{
TEST
(
tdb_test
,
simple_
inser
t2
)
{
int
ret
;
int
ret
;
TENV
*
pEnv
;
TENV
*
pEnv
;
TDB
*
pDb
;
TDB
*
pDb
;
...
@@ -269,7 +271,7 @@ TEST(tdb_test, simple_test2) {
...
@@ -269,7 +271,7 @@ TEST(tdb_test, simple_test2) {
for
(
int
iData
=
1
;
iData
<=
nData
;
iData
++
)
{
for
(
int
iData
=
1
;
iData
<=
nData
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
val
,
"value%d"
,
iData
);
sprintf
(
val
,
"value%d"
,
iData
);
ret
=
tdbDb
Pu
t
(
pDb
,
key
,
strlen
(
key
),
val
,
strlen
(
val
),
&
txn
);
ret
=
tdbDb
Inser
t
(
pDb
,
key
,
strlen
(
key
),
val
,
strlen
(
val
),
&
txn
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
}
}
...
@@ -283,13 +285,15 @@ TEST(tdb_test, simple_test2) {
...
@@ -283,13 +285,15 @@ TEST(tdb_test, simple_test2) {
ret
=
tdbDbcOpen
(
pDb
,
&
pDBC
,
NULL
);
ret
=
tdbDbcOpen
(
pDb
,
&
pDBC
,
NULL
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
tdbDbcMoveToFirst
(
pDBC
);
for
(;;)
{
for
(;;)
{
ret
=
tdbDbcNext
(
pDBC
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
);
ret
=
tdbDbcNext
(
pDBC
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
);
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
break
;
std
::
cout
.
write
((
char
*
)
pKey
,
kLen
)
/* << " " << kLen */
<<
" "
;
//
std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
std
::
cout
.
write
((
char
*
)
pVal
,
vLen
)
/* << " " << vLen */
;
//
std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
std
::
cout
<<
std
::
endl
;
//
std::cout << std::endl;
count
++
;
count
++
;
}
}
...
@@ -316,4 +320,164 @@ TEST(tdb_test, simple_test2) {
...
@@ -316,4 +320,164 @@ TEST(tdb_test, simple_test2) {
// Close Env
// Close Env
ret
=
tdbEnvClose
(
pEnv
);
ret
=
tdbEnvClose
(
pEnv
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
}
TEST
(
tdb_test
,
simple_delete1
)
{
int
ret
;
TDB
*
pDb
;
char
key
[
128
];
char
data
[
128
];
TXN
txn
;
TENV
*
pEnv
;
SPoolMem
*
pPool
;
void
*
pKey
=
NULL
;
void
*
pData
=
NULL
;
int
nKey
;
TDBC
*
pDbc
;
int
nData
;
int
nKV
=
69
;
taosRemoveDir
(
"tdb"
);
pPool
=
openPool
();
// open env
ret
=
tdbEnvOpen
(
"tdb"
,
1024
,
256
,
&
pEnv
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// open database
ret
=
tdbDbOpen
(
"db.db"
,
-
1
,
-
1
,
tKeyCmpr
,
pEnv
,
&
pDb
);
GTEST_ASSERT_EQ
(
ret
,
0
);
tdbTxnOpen
(
&
txn
,
0
,
poolMalloc
,
poolFree
,
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
tdbBegin
(
pEnv
,
&
txn
);
// loop to insert batch data
for
(
int
iData
=
0
;
iData
<
nKV
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
data
,
"data%d"
,
iData
);
ret
=
tdbDbInsert
(
pDb
,
key
,
strlen
(
key
),
data
,
strlen
(
data
),
&
txn
);
GTEST_ASSERT_EQ
(
ret
,
0
);
}
// query the data
for
(
int
iData
=
0
;
iData
<
nKV
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
data
,
"data%d"
,
iData
);
ret
=
tdbDbGet
(
pDb
,
key
,
strlen
(
key
),
&
pData
,
&
nData
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
memcmp
(
data
,
pData
,
nData
),
0
);
}
// loop to delete some data
for
(
int
iData
=
nKV
-
1
;
iData
>
30
;
iData
--
)
{
sprintf
(
key
,
"key%d"
,
iData
);
ret
=
tdbDbDelete
(
pDb
,
key
,
strlen
(
key
),
&
txn
);
GTEST_ASSERT_EQ
(
ret
,
0
);
}
// query the data
for
(
int
iData
=
0
;
iData
<
nKV
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
ret
=
tdbDbGet
(
pDb
,
key
,
strlen
(
key
),
&
pData
,
&
nData
);
if
(
iData
<=
30
)
{
GTEST_ASSERT_EQ
(
ret
,
0
);
}
else
{
GTEST_ASSERT_EQ
(
ret
,
-
1
);
}
}
// loop to iterate the data
tdbDbcOpen
(
pDb
,
&
pDbc
,
NULL
);
ret
=
tdbDbcMoveToFirst
(
pDbc
);
GTEST_ASSERT_EQ
(
ret
,
0
);
pKey
=
NULL
;
pData
=
NULL
;
for
(;;)
{
ret
=
tdbDbcNext
(
pDbc
,
&
pKey
,
&
nKey
,
&
pData
,
&
nData
);
if
(
ret
<
0
)
break
;
std
::
cout
.
write
((
char
*
)
pKey
,
nKey
)
/* << " " << kLen */
<<
" "
;
std
::
cout
.
write
((
char
*
)
pData
,
nData
)
/* << " " << vLen */
;
std
::
cout
<<
std
::
endl
;
}
tdbDbcClose
(
pDbc
);
tdbCommit
(
pEnv
,
&
txn
);
closePool
(
pPool
);
tdbDbClose
(
pDb
);
tdbEnvClose
(
pEnv
);
}
TEST
(
tdb_test
,
simple_upsert1
)
{
int
ret
;
TENV
*
pEnv
;
TDB
*
pDb
;
int
nData
=
100000
;
char
key
[
64
];
char
data
[
64
];
void
*
pData
=
NULL
;
SPoolMem
*
pPool
;
TXN
txn
;
taosRemoveDir
(
"tdb"
);
// open env
ret
=
tdbEnvOpen
(
"tdb"
,
4096
,
64
,
&
pEnv
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// open database
ret
=
tdbDbOpen
(
"db.db"
,
-
1
,
-
1
,
NULL
,
pEnv
,
&
pDb
);
GTEST_ASSERT_EQ
(
ret
,
0
);
pPool
=
openPool
();
// insert some data
tdbTxnOpen
(
&
txn
,
0
,
poolMalloc
,
poolFree
,
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
tdbBegin
(
pEnv
,
&
txn
);
for
(
int
iData
=
0
;
iData
<
nData
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
data
,
"data%d"
,
iData
);
ret
=
tdbDbInsert
(
pDb
,
key
,
strlen
(
key
),
data
,
strlen
(
data
),
&
txn
);
GTEST_ASSERT_EQ
(
ret
,
0
);
}
// query the data
for
(
int
iData
=
0
;
iData
<
nData
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
data
,
"data%d"
,
iData
);
ret
=
tdbDbGet
(
pDb
,
key
,
strlen
(
key
),
&
pData
,
&
nData
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
memcmp
(
pData
,
data
,
nData
),
0
);
}
// upsert some data
for
(
int
iData
=
0
;
iData
<
nData
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
data
,
"data%d-u"
,
iData
);
ret
=
tdbDbUpsert
(
pDb
,
key
,
strlen
(
key
),
data
,
strlen
(
data
),
&
txn
);
GTEST_ASSERT_EQ
(
ret
,
0
);
}
tdbCommit
(
pEnv
,
&
txn
);
// query the data
for
(
int
iData
=
0
;
iData
<
nData
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
data
,
"data%d-u"
,
iData
);
ret
=
tdbDbGet
(
pDb
,
key
,
strlen
(
key
),
&
pData
,
&
nData
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
memcmp
(
pData
,
data
,
nData
),
0
);
}
tdbDbClose
(
pDb
);
tdbEnvClose
(
pEnv
);
}
}
\ No newline at end of file
source/util/src/tskiplist2.c
0 → 100644
浏览文件 @
abcdeb84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tskiplist2.h"
struct
SSLNode
{
int8_t
level
;
SSLNode
*
forwards
[];
};
struct
SSkipList2
{
int8_t
level
;
uint32_t
seed
;
int32_t
size
;
const
SSLCfg
*
pCfg
;
SSLNode
*
pHead
[];
};
static
void
*
slMalloc
(
void
*
pPool
,
int32_t
size
);
static
void
slFree
(
void
*
pPool
,
void
*
p
);
static
int32_t
slCmprFn
(
const
void
*
pKey
,
int32_t
nKey
,
const
void
*
pData
,
int32_t
nData
);
const
SSLCfg
slDefaultCfg
=
{.
maxLevel
=
SL_MAX_LEVEL
,
.
nKey
=
-
1
,
.
nData
=
-
1
,
.
cmprFn
=
slCmprFn
,
.
pPool
=
NULL
,
.
xMalloc
=
slMalloc
,
.
xFree
=
slFree
};
int32_t
slOpen
(
const
SSLCfg
*
pCfg
,
SSkipList2
**
ppSl
)
{
SSkipList2
*
pSl
=
NULL
;
int32_t
size
;
*
ppSl
=
NULL
;
if
(
pCfg
==
NULL
)
pCfg
=
&
slDefaultCfg
;
// check config (TODO)
// malloc handle
size
=
sizeof
(
*
pSl
)
+
sizeof
(
SSLNode
*
)
*
pCfg
->
maxLevel
*
2
;
pSl
=
pCfg
->
xMalloc
(
pCfg
->
pPool
,
size
);
if
(
pSl
==
NULL
)
{
return
-
1
;
}
pSl
->
level
=
0
;
pSl
->
seed
=
taosRand
();
pSl
->
size
=
0
;
pSl
->
pCfg
=
pCfg
;
// init an empty skiplist
for
(
int32_t
i
=
0
;
i
<
pCfg
->
maxLevel
*
2
;
i
++
)
{
pSl
->
pHead
[
i
]
=
NULL
;
}
*
ppSl
=
pSl
;
return
0
;
}
int32_t
slClose
(
SSkipList2
*
pSl
)
{
if
(
pSl
)
{
slClear
(
pSl
);
if
(
pSl
->
pCfg
->
xFree
)
{
pSl
->
pCfg
->
xFree
(
pSl
->
pCfg
->
pPool
,
pSl
);
}
}
return
0
;
}
int32_t
slClear
(
SSkipList2
*
pSl
)
{
// loop to clear sl
for
(;;)
{
// (TODO)
}
// init sl (TODO)
return
0
;
}
int32_t
slcOpen
(
SSkipList2
*
pSl
,
SSLCursor
*
pSlc
)
{
pSlc
->
pSl
=
pSl
;
for
(
int
i
=
0
;
i
<
SL_MAX_LEVEL
;
i
++
)
{
if
(
i
<
pSl
->
pCfg
->
maxLevel
)
{
}
else
{
pSlc
->
forwards
[
i
]
=
NULL
;
}
}
// TODO
return
0
;
}
int32_t
slcClose
(
SSLCursor
*
pSlc
)
{
// TODO
return
0
;
}
int32_t
slcMoveTo
(
SSLCursor
*
pSlc
,
const
void
*
pKey
,
int32_t
nKey
)
{
// TODO
return
0
;
}
int32_t
slcMoveToNext
(
SSLCursor
*
pSlc
)
{
// TODO
return
0
;
}
int32_t
slcMoveToPrev
(
SSLCursor
*
pSlc
)
{
// TODO
return
0
;
}
int32_t
slcMoveToFirst
(
SSLCursor
*
pSlc
)
{
// TODO
return
0
;
}
int32_t
slcMoveToLast
(
SSLCursor
*
pSlc
)
{
// TODO
return
0
;
}
int32_t
slcPut
(
SSLCursor
*
pSlc
,
const
void
*
pKey
,
int32_t
nKey
,
const
void
*
pData
,
int32_t
nData
)
{
// TODO
return
0
;
}
int32_t
slcGet
(
SSLCursor
*
pSlc
,
const
void
**
ppKey
,
int32_t
*
nKey
,
const
void
**
ppData
,
int32_t
*
nData
)
{
// TODO
return
0
;
}
int32_t
slcDrop
(
SSLCursor
*
pSlc
)
{
// TODO
return
0
;
}
static
FORCE_INLINE
void
*
slMalloc
(
void
*
pPool
,
int32_t
size
)
{
return
taosMemoryMalloc
(
size
);
}
static
FORCE_INLINE
void
slFree
(
void
*
pPool
,
void
*
p
)
{
taosMemoryFree
(
p
);
}
static
int32_t
slCmprFn
(
const
void
*
pKey1
,
int32_t
nKey1
,
const
void
*
pKey2
,
int32_t
nKey2
)
{
ASSERT
(
nKey1
>=
0
&&
nKey2
>=
0
);
int32_t
nKey
=
nKey1
>
nKey2
?
nKey2
:
nKey1
;
int32_t
c
;
c
=
memcmp
(
pKey1
,
pKey2
,
nKey
);
if
(
c
==
0
)
{
if
(
nKey1
>
nKey2
)
{
c
=
1
;
}
else
if
(
nKey1
<
nKey2
)
{
c
=
-
1
;
}
}
return
c
;
}
\ No newline at end of file
tests/script/api/batchprepare.c
浏览文件 @
abcdeb84
此差异已折叠。
点击以展开。
taos-tools
@
2f3dfddd
比较
bf6c7669
...
2f3dfddd
Subproject commit
bf6c766986c61ff4fc80421fdea682a8fd4b5b32
Subproject commit
2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录