Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
35d9d801
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
35d9d801
编写于
5月 25, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/mnode
上级
e0d3b53b
b37ca868
变更
30
显示空白变更内容
内联
并排
Showing
30 changed file
with
719 addition
and
292 deletion
+719
-292
cmake/cmake.define
cmake/cmake.define
+2
-2
include/common/tcommon.h
include/common/tcommon.h
+10
-0
include/common/tdatablock.h
include/common/tdatablock.h
+1
-1
include/common/tdataformat.h
include/common/tdataformat.h
+4
-3
include/common/tmsg.h
include/common/tmsg.h
+1
-2
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+2
-8
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+50
-12
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+4
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+7
-5
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+21
-11
source/dnode/vnode/src/meta/metaSnapshot.c
source/dnode/vnode/src/meta/metaSnapshot.c
+93
-0
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+96
-147
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+75
-48
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+36
-0
source/dnode/vnode/src/vnd/vnodeSnapshot.c
source/dnode/vnode/src/vnd/vnodeSnapshot.c
+109
-0
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+4
-2
source/dnode/vnode/src/vnd/vnodeUtil.c
source/dnode/vnode/src/vnd/vnodeUtil.c
+45
-0
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+39
-8
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+3
-7
source/libs/executor/inc/tsort.h
source/libs/executor/inc/tsort.h
+8
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+3
-2
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+51
-12
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+33
-12
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+5
-0
source/util/src/terror.c
source/util/src/terror.c
+1
-0
source/util/src/thash.c
source/util/src/thash.c
+2
-2
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+10
-5
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
未找到文件。
cmake/cmake.define
浏览文件 @
35d9d801
...
@@ -71,8 +71,8 @@ ELSE ()
...
@@ -71,8 +71,8 @@ ELSE ()
ENDIF ()
ENDIF ()
IF (${SANITIZER} MATCHES "true")
IF (${SANITIZER} MATCHES "true")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=
null
-fno-sanitize=alignment -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=
shift-base
-fno-sanitize=alignment -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=
null
-fno-sanitize=alignment -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=
shift-base
-fno-sanitize=alignment -g3")
MESSAGE(STATUS "Will compile with Address Sanitizer!")
MESSAGE(STATUS "Will compile with Address Sanitizer!")
ELSE ()
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
...
...
include/common/tcommon.h
浏览文件 @
35d9d801
...
@@ -219,6 +219,16 @@ typedef struct {
...
@@ -219,6 +219,16 @@ typedef struct {
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define SORT_QSORT_T 0x1
#define SORT_SPILLED_MERGE_SORT_T 0x2
typedef
struct
SSortExecInfo
{
int32_t
sortMethod
;
int32_t
sortBuffer
;
int32_t
loops
;
// loop count
int32_t
writeBytes
;
// write io bytes
int32_t
readBytes
;
// read io bytes
}
SSortExecInfo
;
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
include/common/tdatablock.h
浏览文件 @
35d9d801
...
@@ -198,7 +198,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData);
...
@@ -198,7 +198,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t
blockDataGetNumOfCols
(
const
SSDataBlock
*
pBlock
);
size_t
blockDataGetNumOfCols
(
const
SSDataBlock
*
pBlock
);
size_t
blockDataGetNumOfRows
(
const
SSDataBlock
*
pBlock
);
size_t
blockDataGetNumOfRows
(
const
SSDataBlock
*
pBlock
);
int32_t
blockDataMerge
(
SSDataBlock
*
pDest
,
const
SSDataBlock
*
pSrc
,
SArray
*
pIndexMap
);
int32_t
blockDataMerge
(
SSDataBlock
*
pDest
,
const
SSDataBlock
*
pSrc
);
int32_t
blockDataSplitRows
(
SSDataBlock
*
pBlock
,
bool
hasVarCol
,
int32_t
startIndex
,
int32_t
*
stopIndex
,
int32_t
blockDataSplitRows
(
SSDataBlock
*
pBlock
,
bool
hasVarCol
,
int32_t
startIndex
,
int32_t
*
stopIndex
,
int32_t
pageSize
);
int32_t
pageSize
);
int32_t
blockDataToBuf
(
char
*
buf
,
const
SSDataBlock
*
pBlock
);
int32_t
blockDataToBuf
(
char
*
buf
,
const
SSDataBlock
*
pBlock
);
...
...
include/common/tdataformat.h
浏览文件 @
35d9d801
...
@@ -61,9 +61,10 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
...
@@ -61,9 +61,10 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STag
// STag
int32_t
tTagNew
(
STagVal
*
pTagVals
,
int16_t
nTag
,
STag
**
ppTag
);
int32_t
tTagNew
(
STagVal
*
pTagVals
,
int16_t
nTag
,
STag
**
ppTag
);
void
tTagFree
(
STag
*
pTag
);
void
tTagFree
(
STag
*
pTag
);
void
tTagGet
(
STag
*
pTag
,
int16_t
cid
,
int8_t
type
,
uint8_t
**
ppData
,
int32_t
*
nData
);
int32_t
tTagSet
(
STag
*
pTag
,
SSchema
*
pSchema
,
int32_t
nCols
,
int
iCol
,
uint8_t
*
pData
,
uint32_t
nData
,
STag
**
ppTag
);
int32_t
tEncodeTag
(
SEncoder
*
pEncoder
,
STag
*
pTag
);
void
tTagGet
(
STag
*
pTag
,
int16_t
cid
,
int8_t
type
,
uint8_t
**
ppData
,
uint32_t
*
nData
);
int32_t
tDecodeTag
(
SDecoder
*
pDecoder
,
const
STag
**
ppTag
);
int32_t
tEncodeTag
(
SEncoder
*
pEncoder
,
const
STag
*
pTag
);
int32_t
tDecodeTag
(
SDecoder
*
pDecoder
,
STag
**
ppTag
);
// STRUCT =================
// STRUCT =================
struct
STColumn
{
struct
STColumn
{
...
...
include/common/tmsg.h
浏览文件 @
35d9d801
...
@@ -660,8 +660,7 @@ typedef struct {
...
@@ -660,8 +660,7 @@ typedef struct {
int32_t
tz
;
// query client timezone
int32_t
tz
;
// query client timezone
char
intervalUnit
;
char
intervalUnit
;
char
slidingUnit
;
char
slidingUnit
;
char
char
offsetUnit
;
offsetUnit
;
// TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t
precision
;
int8_t
precision
;
int64_t
interval
;
int64_t
interval
;
int64_t
sliding
;
int64_t
sliding
;
...
...
include/util/taoserror.h
浏览文件 @
35d9d801
...
@@ -313,6 +313,7 @@ int32_t* taosGetErrno();
...
@@ -313,6 +313,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
#define TSDB_CODE_VND_READ_END TAOS_DEF_ERROR_CODE(0, 0x051c)
// tsdb
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
...
...
source/common/src/tdatablock.c
浏览文件 @
35d9d801
...
@@ -361,19 +361,13 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
...
@@ -361,19 +361,13 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
return
0
;
return
0
;
}
}
// if pIndexMap = NULL, merger one column by on column
int32_t
blockDataMerge
(
SSDataBlock
*
pDest
,
const
SSDataBlock
*
pSrc
)
{
int32_t
blockDataMerge
(
SSDataBlock
*
pDest
,
const
SSDataBlock
*
pSrc
,
SArray
*
pIndexMap
)
{
assert
(
pSrc
!=
NULL
&&
pDest
!=
NULL
);
assert
(
pSrc
!=
NULL
&&
pDest
!=
NULL
);
int32_t
capacity
=
pDest
->
info
.
capacity
;
int32_t
capacity
=
pDest
->
info
.
capacity
;
for
(
int32_t
i
=
0
;
i
<
pDest
->
info
.
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pDest
->
info
.
numOfCols
;
++
i
)
{
int32_t
mapIndex
=
i
;
// if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// }
SColumnInfoData
*
pCol2
=
taosArrayGet
(
pDest
->
pDataBlock
,
i
);
SColumnInfoData
*
pCol2
=
taosArrayGet
(
pDest
->
pDataBlock
,
i
);
SColumnInfoData
*
pCol1
=
taosArrayGet
(
pSrc
->
pDataBlock
,
mapIndex
);
SColumnInfoData
*
pCol1
=
taosArrayGet
(
pSrc
->
pDataBlock
,
i
);
capacity
=
pDest
->
info
.
capacity
;
capacity
=
pDest
->
info
.
capacity
;
colDataMergeCol
(
pCol2
,
pDest
->
info
.
rows
,
&
capacity
,
pCol1
,
pSrc
->
info
.
rows
);
colDataMergeCol
(
pCol2
,
pDest
->
info
.
rows
,
&
capacity
,
pCol1
,
pSrc
->
info
.
rows
);
...
...
source/common/src/tdataformat.c
浏览文件 @
35d9d801
...
@@ -581,7 +581,52 @@ void tTagFree(STag *pTag) {
...
@@ -581,7 +581,52 @@ void tTagFree(STag *pTag) {
if
(
pTag
)
taosMemoryFree
(
pTag
);
if
(
pTag
)
taosMemoryFree
(
pTag
);
}
}
void
tTagGet
(
STag
*
pTag
,
int16_t
cid
,
int8_t
type
,
uint8_t
**
ppData
,
int32_t
*
nData
)
{
int32_t
tTagSet
(
STag
*
pTag
,
SSchema
*
pSchema
,
int32_t
nCols
,
int
iCol
,
uint8_t
*
pData
,
uint32_t
nData
,
STag
**
ppTag
)
{
STagVal
*
pTagVals
;
int16_t
nTags
=
0
;
SSchema
*
pColumn
;
uint8_t
*
p
;
uint32_t
n
;
pTagVals
=
(
STagVal
*
)
taosMemoryMalloc
(
sizeof
(
*
pTagVals
)
*
nCols
);
if
(
pTagVals
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
nCols
;
i
++
)
{
pColumn
=
&
pSchema
[
i
];
if
(
i
==
iCol
)
{
p
=
pData
;
n
=
nData
;
}
else
{
tTagGet
(
pTag
,
pColumn
->
colId
,
pColumn
->
type
,
&
p
,
&
n
);
}
if
(
p
==
NULL
)
continue
;
ASSERT
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
)
||
n
==
pColumn
->
bytes
);
pTagVals
[
nTags
].
cid
=
pColumn
->
colId
;
pTagVals
[
nTags
].
type
=
pColumn
->
type
;
pTagVals
[
nTags
].
nData
=
n
;
pTagVals
[
nTags
].
pData
=
p
;
nTags
++
;
}
// create new tag
if
(
tTagNew
(
pTagVals
,
nTags
,
ppTag
)
<
0
)
{
taosMemoryFree
(
pTagVals
);
return
-
1
;
}
taosMemoryFree
(
pTagVals
);
return
0
;
}
void
tTagGet
(
STag
*
pTag
,
int16_t
cid
,
int8_t
type
,
uint8_t
**
ppData
,
uint32_t
*
nData
)
{
STagIdx
*
pTagIdx
=
bsearch
(
&
((
STagIdx
){.
cid
=
cid
}),
pTag
->
idx
,
pTag
->
nTag
,
sizeof
(
STagIdx
),
tTagIdxCmprFn
);
STagIdx
*
pTagIdx
=
bsearch
(
&
((
STagIdx
){.
cid
=
cid
}),
pTag
->
idx
,
pTag
->
nTag
,
sizeof
(
STagIdx
),
tTagIdxCmprFn
);
if
(
pTagIdx
==
NULL
)
{
if
(
pTagIdx
==
NULL
)
{
*
ppData
=
NULL
;
*
ppData
=
NULL
;
...
@@ -597,18 +642,11 @@ void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nD
...
@@ -597,18 +642,11 @@ void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nD
}
}
}
}
int32_t
tEncodeTag
(
SEncoder
*
pEncoder
,
STag
*
pTag
)
{
int32_t
tEncodeTag
(
SEncoder
*
pEncoder
,
const
STag
*
pTag
)
{
// return tEncodeBinary(pEncoder, (uint8_t *)pTag, pTag->len);
return
tEncodeBinary
(
pEncoder
,
(
const
uint8_t
*
)
pTag
,
pTag
->
len
);
ASSERT
(
0
);
return
0
;
}
}
int32_t
tDecodeTag
(
SDecoder
*
pDecoder
,
const
STag
**
ppTag
)
{
int32_t
tDecodeTag
(
SDecoder
*
pDecoder
,
STag
**
ppTag
)
{
return
tDecodeBinary
(
pDecoder
,
(
uint8_t
**
)
ppTag
,
NULL
);
}
// uint32_t n;
// return tDecodeBinary(pDecoder, (const uint8_t **)ppTag, &n);
ASSERT
(
0
);
return
0
;
}
#if 1 // ===================================================================================================================
#if 1 // ===================================================================================================================
static
void
dataColSetNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
static
void
dataColSetNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
...
@@ -1087,7 +1125,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
...
@@ -1087,7 +1125,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
kvRowSetNCols
(
row
,
pBuilder
->
nCols
);
kvRowSetNCols
(
row
,
pBuilder
->
nCols
);
kvRowSetLen
(
row
,
tlen
);
kvRowSetLen
(
row
,
tlen
);
if
(
pBuilder
->
nCols
>
0
)
{
if
(
pBuilder
->
nCols
>
0
)
{
memcpy
(
kvRowColIdx
(
row
),
pBuilder
->
pColIdx
,
sizeof
(
SColIdx
)
*
pBuilder
->
nCols
);
memcpy
(
kvRowColIdx
(
row
),
pBuilder
->
pColIdx
,
sizeof
(
SColIdx
)
*
pBuilder
->
nCols
);
memcpy
(
kvRowValues
(
row
),
pBuilder
->
buf
,
pBuilder
->
size
);
memcpy
(
kvRowValues
(
row
),
pBuilder
->
buf
,
pBuilder
->
size
);
}
}
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
35d9d801
...
@@ -13,6 +13,8 @@ target_sources(
...
@@ -13,6 +13,8 @@ target_sources(
"src/vnd/vnodeModule.c"
"src/vnd/vnodeModule.c"
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeUtil.c"
# meta
# meta
"src/meta/metaOpen.c"
"src/meta/metaOpen.c"
...
@@ -22,6 +24,7 @@ target_sources(
...
@@ -22,6 +24,7 @@ target_sources(
"src/meta/metaQuery.c"
"src/meta/metaQuery.c"
"src/meta/metaCommit.c"
"src/meta/metaCommit.c"
"src/meta/metaEntry.c"
"src/meta/metaEntry.c"
"src/meta/metaSnapshot.c"
# sma
# sma
"src/sma/sma.c"
"src/sma/sma.c"
...
@@ -44,6 +47,7 @@ target_sources(
...
@@ -44,6 +47,7 @@ target_sources(
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbReadImpl.c"
# "src/tsdb/tsdbSma.c"
# "src/tsdb/tsdbSma.c"
"src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbSnapshot.c"
# tq
# tq
"src/tq/tq.c"
"src/tq/tq.c"
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
35d9d801
...
@@ -42,6 +42,7 @@ extern "C" {
...
@@ -42,6 +42,7 @@ extern "C" {
typedef
struct
SVnode
SVnode
;
typedef
struct
SVnode
SVnode
;
typedef
struct
STsdbCfg
STsdbCfg
;
// todo: remove
typedef
struct
STsdbCfg
STsdbCfg
;
// todo: remove
typedef
struct
SVnodeCfg
SVnodeCfg
;
typedef
struct
SVnodeCfg
SVnodeCfg
;
typedef
struct
SVSnapshotReader
SVSnapshotReader
;
extern
const
SVnodeCfg
vnodeCfgDefault
;
extern
const
SVnodeCfg
vnodeCfgDefault
;
...
@@ -59,13 +60,14 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
...
@@ -59,13 +60,14 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
);
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
);
int32_t
vnodeValidateTableHash
(
SVnode
*
pVnode
,
char
*
tableFName
);
int32_t
vnodeValidateTableHash
(
SVnode
*
pVnode
,
char
*
tableFName
);
int32_t
vnodeStart
(
SVnode
*
pVnode
);
int32_t
vnodeStart
(
SVnode
*
pVnode
);
void
vnodeStop
(
SVnode
*
pVnode
);
void
vnodeStop
(
SVnode
*
pVnode
);
int64_t
vnodeGetSyncHandle
(
SVnode
*
pVnode
);
int64_t
vnodeGetSyncHandle
(
SVnode
*
pVnode
);
void
vnodeGetSnapshot
(
SVnode
*
pVnode
,
SSnapshot
*
pSnapshot
);
void
vnodeGetSnapshot
(
SVnode
*
pVnode
,
SSnapshot
*
pSnapshot
);
void
vnodeGetInfo
(
SVnode
*
pVnode
,
const
char
**
dbname
,
int32_t
*
vgId
);
void
vnodeGetInfo
(
SVnode
*
pVnode
,
const
char
**
dbname
,
int32_t
*
vgId
);
int32_t
vnodeSnapshotReaderOpen
(
SVnode
*
pVnode
,
SVSnapshotReader
**
ppReader
,
int64_t
sver
,
int64_t
ever
);
int32_t
vnodeSnapshotReaderClose
(
SVSnapshotReader
*
pReader
);
int32_t
vnodeSnapshotRead
(
SVSnapshotReader
*
pReader
,
const
void
**
ppData
,
uint32_t
*
nData
);
// meta
// meta
typedef
struct
SMeta
SMeta
;
// todo: remove
typedef
struct
SMeta
SMeta
;
// todo: remove
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
35d9d801
...
@@ -56,6 +56,8 @@ typedef struct SVState SVState;
...
@@ -56,6 +56,8 @@ typedef struct SVState SVState;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SQWorker
SQHandle
;
typedef
struct
SQWorker
SQHandle
;
typedef
struct
STsdbKeepCfg
STsdbKeepCfg
;
typedef
struct
STsdbKeepCfg
STsdbKeepCfg
;
typedef
struct
SMetaSnapshotReader
SMetaSnapshotReader
;
typedef
struct
STsdbSnapshotReader
STsdbSnapshotReader
;
#define VNODE_META_DIR "meta"
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
#define VNODE_TSDB_DIR "tsdb"
...
@@ -69,6 +71,8 @@ typedef struct STsdbKeepCfg STsdbKeepCfg;
...
@@ -69,6 +71,8 @@ typedef struct STsdbKeepCfg STsdbKeepCfg;
// vnd.h
// vnd.h
void
*
vnodeBufPoolMalloc
(
SVBufPool
*
pPool
,
int
size
);
void
*
vnodeBufPoolMalloc
(
SVBufPool
*
pPool
,
int
size
);
void
vnodeBufPoolFree
(
SVBufPool
*
pPool
,
void
*
p
);
void
vnodeBufPoolFree
(
SVBufPool
*
pPool
,
void
*
p
);
int32_t
vnodeRealloc
(
void
**
pp
,
int32_t
size
);
void
vnodeFree
(
void
*
p
);
// meta
// meta
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
...
@@ -95,6 +99,9 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
...
@@ -95,6 +99,9 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
bool
deepCopy
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
bool
deepCopy
);
SArray
*
metaGetSmaIdsByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaIdsByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
);
int32_t
metaSnapshotReaderOpen
(
SMeta
*
pMeta
,
SMetaSnapshotReader
**
ppReader
,
int64_t
sver
,
int64_t
ever
);
int32_t
metaSnapshotReaderClose
(
SMetaSnapshotReader
*
pReader
);
int32_t
metaSnapshotRead
(
SMetaSnapshotReader
*
pReader
,
void
**
ppData
,
uint32_t
*
nData
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
int64_t
version
,
SSmaCfg
*
pCfg
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
int64_t
version
,
SSmaCfg
*
pCfg
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
...
@@ -112,6 +119,9 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
...
@@ -112,6 +119,9 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
tsdbReaderT
tsdbQueryCacheLastT
(
STsdb
*
tsdb
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
tsdbQueryCacheLastT
(
STsdb
*
tsdb
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
void
*
pMemRef
);
int32_t
tsdbGetTableGroupFromIdListT
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetTableGroupFromIdListT
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbSnapshotReaderOpen
(
STsdb
*
pTsdb
,
STsdbSnapshotReader
**
ppReader
,
int64_t
sver
,
int64_t
ever
);
int32_t
tsdbSnapshotReaderClose
(
STsdbSnapshotReader
*
pReader
);
int32_t
tsdbSnapshotRead
(
STsdbSnapshotReader
*
pReader
,
void
**
ppData
,
uint32_t
*
nData
);
// tq
// tq
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
);
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
);
...
...
source/dnode/vnode/src/meta/metaSnapshot.c
0 → 100644
浏览文件 @
35d9d801
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
struct
SMetaSnapshotReader
{
SMeta
*
pMeta
;
TBC
*
pTbc
;
int64_t
sver
;
int64_t
ever
;
};
int32_t
metaSnapshotReaderOpen
(
SMeta
*
pMeta
,
SMetaSnapshotReader
**
ppReader
,
int64_t
sver
,
int64_t
ever
)
{
int32_t
code
=
0
;
int32_t
c
=
0
;
SMetaSnapshotReader
*
pMetaReader
=
NULL
;
pMetaReader
=
(
SMetaSnapshotReader
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pMetaReader
));
if
(
pMetaReader
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pMetaReader
->
pMeta
=
pMeta
;
pMetaReader
->
sver
=
sver
;
pMetaReader
->
ever
=
ever
;
code
=
tdbTbcOpen
(
pMeta
->
pTbDb
,
&
pMetaReader
->
pTbc
,
NULL
);
if
(
code
)
{
goto
_err
;
}
code
=
tdbTbcMoveTo
(
pMetaReader
->
pTbc
,
&
(
STbDbKey
){.
version
=
sver
,
.
uid
=
INT64_MIN
},
sizeof
(
STbDbKey
),
&
c
);
if
(
code
)
{
goto
_err
;
}
*
ppReader
=
pMetaReader
;
return
code
;
_err:
*
ppReader
=
NULL
;
return
code
;
}
int32_t
metaSnapshotReaderClose
(
SMetaSnapshotReader
*
pReader
)
{
if
(
pReader
)
{
tdbTbcClose
(
pReader
->
pTbc
);
taosMemoryFree
(
pReader
);
}
return
0
;
}
int32_t
metaSnapshotRead
(
SMetaSnapshotReader
*
pReader
,
void
**
ppData
,
uint32_t
*
nDatap
)
{
const
void
*
pKey
=
NULL
;
const
void
*
pData
=
NULL
;
int32_t
nKey
=
0
;
int32_t
nData
=
0
;
int32_t
code
=
0
;
for
(;;)
{
code
=
tdbTbcGet
(
pReader
->
pTbc
,
&
pKey
,
&
nKey
,
&
pData
,
&
nData
);
if
(
code
||
((
STbDbKey
*
)
pData
)
->
version
>
pReader
->
ever
)
{
return
TSDB_CODE_VND_READ_END
;
}
if
(((
STbDbKey
*
)
pData
)
->
version
<
pReader
->
sver
)
{
continue
;
}
break
;
}
// copy the data
if
(
vnodeRealloc
(
ppData
,
nData
)
<
0
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
code
;
}
memcpy
(
*
ppData
,
pData
,
nData
);
*
nDatap
=
nData
;
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
35d9d801
...
@@ -23,6 +23,7 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME);
...
@@ -23,6 +23,7 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME);
static
int
metaSaveToSkmDb
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaSaveToSkmDb
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateCtbIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateCtbIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateTagIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pCtbEntry
);
static
int
metaUpdateTagIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pCtbEntry
);
static
int
metaDropTableByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int
*
type
);
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
)
{
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
)
{
SMetaEntry
me
=
{
0
};
SMetaEntry
me
=
{
0
};
...
@@ -71,64 +72,71 @@ _err:
...
@@ -71,64 +72,71 @@ _err:
}
}
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
)
{
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
)
{
TBC
*
pNameIdxc
=
NULL
;
void
*
pKey
=
NULL
;
TBC
*
pUidIdxc
=
NULL
;
int
nKey
=
0
;
TBC
*
pCtbIdxc
=
NULL
;
void
*
pData
=
NULL
;
SCtbIdxKey
*
pCtbIdxKey
;
int
nData
=
0
;
const
void
*
pKey
=
NULL
;
int
c
=
0
;
int
nKey
;
int
rc
=
0
;
const
void
*
pData
=
NULL
;
int
nData
;
int
c
,
ret
;
// prepare uid idx cursor
tdbTbcOpen
(
pMeta
->
pUidIdx
,
&
pUidIdxc
,
&
pMeta
->
txn
);
ret
=
tdbTbcMoveTo
(
pUidIdxc
,
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
c
);
if
(
ret
<
0
||
c
!=
0
)
{
terrno
=
TSDB_CODE_VND_TB_NOT_EXIST
;
tdbTbcClose
(
pUidIdxc
);
goto
_err
;
}
//
prepare name idx cursor
//
check if super table exists
tdbTbcOpen
(
pMeta
->
pNameIdx
,
&
pNameIdxc
,
&
pMeta
->
txn
);
rc
=
tdbTbGet
(
pMeta
->
pNameIdx
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
pData
,
&
nData
);
ret
=
tdbTbcMoveTo
(
pNameIdxc
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
c
);
if
(
rc
<
0
||
*
(
tb_uid_t
*
)
pData
!=
pReq
->
suid
)
{
if
(
ret
<
0
||
c
!=
0
)
{
terrno
=
TSDB_CODE_VND_TABLE_NOT_EXIST
;
ASSERT
(
0
)
;
return
-
1
;
}
}
tdbTbcDelete
(
pUidIdxc
);
// drop all child tables
tdbTbcDelete
(
pNameIdxc
);
TBC
*
pCtbIdxc
=
NULL
;
tdbTbcClose
(
pUidIdxc
);
SArray
*
pArray
=
taosArrayInit
(
8
,
sizeof
(
tb_uid_t
));
tdbTbcClose
(
pNameIdxc
);
// loop to drop each child table
tdbTbcOpen
(
pMeta
->
pCtbIdx
,
&
pCtbIdxc
,
&
pMeta
->
txn
);
tdbTbcOpen
(
pMeta
->
pCtbIdx
,
&
pCtbIdxc
,
&
pMeta
->
txn
);
r
et
=
tdbTbcMoveTo
(
pCtbIdxc
,
&
(
SCtbIdxKey
){.
suid
=
pReq
->
suid
,
.
uid
=
INT64_MIN
},
sizeof
(
SCtbIdxKey
),
&
c
);
r
c
=
tdbTbcMoveTo
(
pCtbIdxc
,
&
(
SCtbIdxKey
){.
suid
=
pReq
->
suid
,
.
uid
=
INT64_MIN
},
sizeof
(
SCtbIdxKey
),
&
c
);
if
(
r
et
<
0
||
(
c
<
0
&&
tdbTbcMoveToNext
(
pCtbIdxc
)
<
0
)
)
{
if
(
r
c
<
0
)
{
tdbTbcClose
(
pCtbIdxc
);
tdbTbcClose
(
pCtbIdxc
);
goto
_exit
;
metaWLock
(
pMeta
);
goto
_drop_super_table
;
}
}
for
(;;)
{
for
(;;)
{
tdbTbcGet
(
pCtbIdxc
,
&
pKey
,
&
nKey
,
NULL
,
NULL
);
rc
=
tdbTbcNext
(
pCtbIdxc
,
&
pKey
,
&
nKey
,
NULL
,
NULL
);
pCtbIdxKey
=
(
SCtbIdxKey
*
)
pKey
;
if
(
rc
<
0
)
break
;
if
(((
SCtbIdxKey
*
)
pKey
)
->
suid
<
pReq
->
suid
)
{
continue
;
}
else
if
(((
SCtbIdxKey
*
)
pKey
)
->
suid
>
pReq
->
suid
)
{
break
;
}
taosArrayPush
(
pArray
,
&
(((
SCtbIdxKey
*
)
pKey
)
->
uid
));
}
if
(
pCtbIdxKey
->
suid
>
pReq
->
suid
)
break
;
tdbTbcClose
(
pCtbIdxc
)
;
// drop the child table (TODO)
metaWLock
(
pMeta
);
if
(
tdbTbcMoveToNext
(
pCtbIdxc
)
<
0
)
break
;
for
(
int32_t
iChild
=
0
;
iChild
<
taosArrayGetSize
(
pArray
);
iChild
++
)
{
tb_uid_t
uid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pArray
,
iChild
);
metaDropTableByUid
(
pMeta
,
uid
,
NULL
);
}
}
taosArrayDestroy
(
pArray
);
// drop super table
_drop_super_table:
tdbTbGet
(
pMeta
->
pUidIdx
,
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pData
,
&
nData
);
tdbTbDelete
(
pMeta
->
pTbDb
,
&
(
STbDbKey
){.
version
=
*
(
int64_t
*
)
pData
,
.
uid
=
pReq
->
suid
},
sizeof
(
STbDbKey
),
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pNameIdx
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pUidIdx
,
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pMeta
->
txn
);
metaULock
(
pMeta
);
_exit:
_exit:
tdbFree
(
pKey
);
tdbFree
(
pData
);
metaDebug
(
"vgId:%d super table %s uid:%"
PRId64
" is dropped"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
);
metaDebug
(
"vgId:%d super table %s uid:%"
PRId64
" is dropped"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
);
return
0
;
return
0
;
_err:
metaError
(
"vgId:%d failed to drop super table %s uid:%"
PRId64
" since %s"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
,
tstrerror
(
terrno
));
return
-
1
;
}
}
int
metaAlterSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
)
{
int
metaAlterSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
)
{
...
@@ -256,122 +264,63 @@ _err:
...
@@ -256,122 +264,63 @@ _err:
}
}
int
metaDropTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVDropTbReq
*
pReq
,
SArray
*
tbUids
)
{
int
metaDropTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVDropTbReq
*
pReq
,
SArray
*
tbUids
)
{
TBC
*
pTbDbc
=
NULL
;
void
*
pData
=
NULL
;
TBC
*
pUidIdxc
=
NULL
;
int
nData
=
0
;
TBC
*
pNameIdxc
=
NULL
;
int
rc
=
0
;
const
void
*
pData
;
int
nData
;
tb_uid_t
uid
;
tb_uid_t
uid
;
int64_t
tver
;
int
type
;
SMetaEntry
me
=
{
0
};
SDecoder
coder
=
{
0
};
int8_t
type
;
int64_t
ctime
;
tb_uid_t
suid
;
int
c
=
0
,
ret
;
// search & delete the name idx
tdbTbcOpen
(
pMeta
->
pNameIdx
,
&
pNameIdxc
,
&
pMeta
->
txn
);
ret
=
tdbTbcMoveTo
(
pNameIdxc
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
c
);
if
(
ret
<
0
||
!
tdbTbcIsValid
(
pNameIdxc
)
||
c
)
{
tdbTbcClose
(
pNameIdxc
);
terrno
=
TSDB_CODE_VND_TABLE_NOT_EXIST
;
return
-
1
;
}
r
et
=
tdbTbcGet
(
pNameIdxc
,
NULL
,
NULL
,
&
pData
,
&
nData
);
r
c
=
tdbTbGet
(
pMeta
->
pNameIdx
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
pData
,
&
nData
);
if
(
r
et
<
0
)
{
if
(
r
c
<
0
)
{
ASSERT
(
0
)
;
terrno
=
TSDB_CODE_VND_TABLE_NOT_EXIST
;
return
-
1
;
return
-
1
;
}
}
uid
=
*
(
tb_uid_t
*
)
pData
;
uid
=
*
(
tb_uid_t
*
)
pData
;
tdbTbcDelete
(
pNameIdxc
);
metaWLock
(
pMeta
);
tdbTbcClose
(
pNameIdxc
);
metaDropTableByUid
(
pMeta
,
uid
,
&
type
);
metaULock
(
pMeta
);
// search & delete uid idx
tdbTbcOpen
(
pMeta
->
pUidIdx
,
&
pUidIdxc
,
&
pMeta
->
txn
);
ret
=
tdbTbcMoveTo
(
pUidIdxc
,
&
uid
,
sizeof
(
uid
),
&
c
);
if
(
ret
<
0
||
c
!=
0
)
{
ASSERT
(
0
);
return
-
1
;
}
ret
=
tdbTbcGet
(
pUidIdxc
,
NULL
,
NULL
,
&
pData
,
&
nData
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tver
=
*
(
int64_t
*
)
pData
;
tdbTbcDelete
(
pUidIdxc
);
tdbTbcClose
(
pUidIdxc
);
// search and get meta entry
if
(
type
==
TSDB_CHILD_TABLE
&&
tbUids
)
{
tdbTbcOpen
(
pMeta
->
pTbDb
,
&
pTbDbc
,
&
pMeta
->
txn
);
taosArrayPush
(
tbUids
,
&
uid
);
ret
=
tdbTbcMoveTo
(
pTbDbc
,
&
(
STbDbKey
){.
uid
=
uid
,
.
version
=
tver
},
sizeof
(
STbDbKey
),
&
c
);
if
(
ret
<
0
||
c
!=
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
ret
=
tdbTbcGet
(
pTbDbc
,
NULL
,
NULL
,
&
pData
,
&
nData
);
tdbFree
(
pData
);
if
(
ret
<
0
)
{
return
0
;
ASSERT
(
0
);
}
return
-
1
;
}
// decode entry
void
*
pDataCopy
=
taosMemoryMalloc
(
nData
);
// remove the copy (todo)
memcpy
(
pDataCopy
,
pData
,
nData
);
tDecoderInit
(
&
coder
,
pDataCopy
,
nData
);
ret
=
metaDecodeEntry
(
&
coder
,
&
me
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
type
=
me
.
type
;
static
int
metaDropTableByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int
*
type
)
{
if
(
type
==
TSDB_CHILD_TABLE
)
{
void
*
pData
=
NULL
;
ctime
=
me
.
ctbEntry
.
ctime
;
int
nData
=
0
;
suid
=
me
.
ctbEntry
.
suid
;
int
rc
=
0
;
taosArrayPush
(
tbUids
,
&
me
.
uid
);
int64_t
version
;
}
else
if
(
type
==
TSDB_NORMAL_TABLE
)
{
SMetaEntry
e
=
{
0
};
ctime
=
me
.
ntbEntry
.
ctime
;
SDecoder
dc
=
{
0
};
suid
=
0
;
}
else
{
ASSERT
(
0
);
}
taosMemoryFree
(
pDataCopy
);
rc
=
tdbTbGet
(
pMeta
->
pUidIdx
,
&
uid
,
sizeof
(
uid
),
&
pData
,
&
nData
);
tDecoderClear
(
&
coder
);
version
=
*
(
int64_t
*
)
pData
;
tdbTbcClose
(
pTbDbc
);
if
(
type
==
TSDB_CHILD_TABLE
)
{
tdbTbGet
(
pMeta
->
pTbDb
,
&
(
STbDbKey
){.
version
=
version
,
.
uid
=
uid
},
sizeof
(
STbDbKey
),
&
pData
,
&
nData
);
// remove the pCtbIdx
TBC
*
pCtbIdxc
=
NULL
;
tdbTbcOpen
(
pMeta
->
pCtbIdx
,
&
pCtbIdxc
,
&
pMeta
->
txn
);
ret
=
tdbTbcMoveTo
(
pCtbIdxc
,
&
(
SCtbIdxKey
){.
suid
=
suid
,
.
uid
=
uid
},
sizeof
(
SCtbIdxKey
),
&
c
);
tDecoderInit
(
&
dc
,
pData
,
nData
);
if
(
ret
<
0
||
c
!=
0
)
{
metaDecodeEntry
(
&
dc
,
&
e
);
ASSERT
(
0
);
return
-
1
;
}
tdbTbcDelete
(
pCtbIdxc
);
if
(
type
)
*
type
=
e
.
type
;
tdbTbcClose
(
pCtbIdxc
);
// remove tags from pTagIdx (todo)
tdbTbDelete
(
pMeta
->
pTbDb
,
&
(
STbDbKey
){.
version
=
version
,
.
uid
=
uid
},
sizeof
(
STbDbKey
),
&
pMeta
->
txn
);
}
else
if
(
type
==
TSDB_NORMAL_TABLE
)
{
tdbTbDelete
(
pMeta
->
pNameIdx
,
e
.
name
,
strlen
(
e
.
name
)
+
1
,
&
pMeta
->
txn
);
// remove from pSkmDb
tdbTbDelete
(
pMeta
->
pUidIdx
,
&
uid
,
sizeof
(
uid
),
&
pMeta
->
txn
);
}
else
{
if
(
e
.
type
==
TSDB_CHILD_TABLE
)
{
ASSERT
(
0
);
tdbTbDelete
(
pMeta
->
pCtbIdx
,
&
(
SCtbIdxKey
){.
suid
=
e
.
ctbEntry
.
suid
,
.
uid
=
uid
},
sizeof
(
SCtbIdxKey
),
&
pMeta
->
txn
);
}
else
if
(
e
.
type
==
TSDB_NORMAL_TABLE
)
{
// drop schema.db (todo)
// drop ttl.idx (todo)
}
else
if
(
e
.
type
==
TSDB_SUPER_TABLE
)
{
// drop schema.db (todo)
}
}
// remove from ttl (todo)
tDecoderClear
(
&
dc
);
if
(
ctime
>
0
)
{
tdbFree
(
pData
);
}
return
0
;
return
0
;
}
}
...
@@ -608,14 +557,14 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
...
@@ -608,14 +557,14 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
// TODO : need to update tag index
// TODO : need to update tag index
}
}
ctbEntry
.
version
=
version
;
ctbEntry
.
version
=
version
;
if
(
pTagSchema
->
nCols
==
1
&&
pTagSchema
->
pSchema
[
0
].
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
pTagSchema
->
nCols
==
1
&&
pTagSchema
->
pSchema
[
0
].
type
==
TSDB_DATA_TYPE_JSON
)
{
ctbEntry
.
ctbEntry
.
pTags
=
taosMemoryMalloc
(
pAlterTbReq
->
nTagVal
);
ctbEntry
.
ctbEntry
.
pTags
=
taosMemoryMalloc
(
pAlterTbReq
->
nTagVal
);
if
(
ctbEntry
.
ctbEntry
.
pTags
==
NULL
)
{
if
(
ctbEntry
.
ctbEntry
.
pTags
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
goto
_err
;
}
}
memcpy
((
void
*
)
ctbEntry
.
ctbEntry
.
pTags
,
pAlterTbReq
->
pTagVal
,
pAlterTbReq
->
nTagVal
);
memcpy
((
void
*
)
ctbEntry
.
ctbEntry
.
pTags
,
pAlterTbReq
->
pTagVal
,
pAlterTbReq
->
nTagVal
);
}
else
{
}
else
{
SKVRowBuilder
kvrb
=
{
0
};
SKVRowBuilder
kvrb
=
{
0
};
const
SKVRow
pOldTag
=
(
const
SKVRow
)
ctbEntry
.
ctbEntry
.
pTags
;
const
SKVRow
pOldTag
=
(
const
SKVRow
)
ctbEntry
.
ctbEntry
.
pTags
;
SKVRow
pNewTag
=
NULL
;
SKVRow
pNewTag
=
NULL
;
...
@@ -649,7 +598,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
...
@@ -649,7 +598,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear
(
&
dc1
);
tDecoderClear
(
&
dc1
);
tDecoderClear
(
&
dc2
);
tDecoderClear
(
&
dc2
);
if
(
ctbEntry
.
ctbEntry
.
pTags
)
taosMemoryFree
((
void
*
)
ctbEntry
.
ctbEntry
.
pTags
);
if
(
ctbEntry
.
ctbEntry
.
pTags
)
taosMemoryFree
((
void
*
)
ctbEntry
.
ctbEntry
.
pTags
);
if
(
ctbEntry
.
pBuf
)
taosMemoryFree
(
ctbEntry
.
pBuf
);
if
(
ctbEntry
.
pBuf
)
taosMemoryFree
(
ctbEntry
.
pBuf
);
if
(
stbEntry
.
pBuf
)
tdbFree
(
stbEntry
.
pBuf
);
if
(
stbEntry
.
pBuf
)
tdbFree
(
stbEntry
.
pBuf
);
tdbTbcClose
(
pTbDbc
);
tdbTbcClose
(
pTbDbc
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
35d9d801
...
@@ -51,6 +51,47 @@ int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_
...
@@ -51,6 +51,47 @@ int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_
return
strcmp
(
pKey1
,
pKey2
);
return
strcmp
(
pKey1
,
pKey2
);
}
}
int32_t
tqStoreExec
(
STQ
*
pTq
,
const
char
*
key
,
const
STqExec
*
pExec
)
{
int32_t
code
;
int32_t
vlen
;
tEncodeSize
(
tEncodeSTqExec
,
pExec
,
vlen
,
code
);
ASSERT
(
code
==
0
);
void
*
buf
=
taosMemoryCalloc
(
1
,
vlen
);
if
(
buf
==
NULL
)
{
ASSERT
(
0
);
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
buf
,
vlen
);
if
(
tEncodeSTqExec
(
&
encoder
,
pExec
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbUpsert
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
buf
,
vlen
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
tEncoderClear
(
&
encoder
);
taosMemoryFree
(
buf
);
return
0
;
}
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
)
{
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
)
{
STQ
*
pTq
=
taosMemoryMalloc
(
sizeof
(
STQ
));
STQ
*
pTq
=
taosMemoryMalloc
(
sizeof
(
STQ
));
if
(
pTq
==
NULL
)
{
if
(
pTq
==
NULL
)
{
...
@@ -96,8 +137,31 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
...
@@ -96,8 +137,31 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
int
vLen
;
int
vLen
;
tdbTbcMoveToFirst
(
pCur
);
tdbTbcMoveToFirst
(
pCur
);
SDecoder
decoder
;
while
(
tdbTbcNext
(
pCur
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
)
==
0
)
{
while
(
tdbTbcNext
(
pCur
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
)
==
0
)
{
// create, put into execsj
STqExec
exec
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeSTqExec
(
&
decoder
,
&
exec
);
exec
.
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
if
(
exec
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
exec
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
exec
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
exec
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
exec
.
qmsg
,
&
handle
);
ASSERT
(
exec
.
task
[
i
]);
}
}
else
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
exec
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
exec
.
pDropTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
taosHashPut
(
pTq
->
execs
,
pKey
,
kLen
,
&
exec
,
sizeof
(
STqExec
));
}
}
if
(
tdbTxnClose
(
&
txn
)
<
0
)
{
if
(
tdbTxnClose
(
&
txn
)
<
0
)
{
...
@@ -604,7 +668,9 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
...
@@ -604,7 +668,9 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT
(
0
);
ASSERT
(
0
);
}
}
tdbTbDelete
(
pTq
->
pExecStore
,
pReq
->
subKey
,
(
int
)
strlen
(
pReq
->
subKey
),
&
txn
);
if
(
tdbTbDelete
(
pTq
->
pExecStore
,
pReq
->
subKey
,
(
int
)
strlen
(
pReq
->
subKey
),
&
txn
)
<
0
)
{
/*ASSERT(0);*/
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
ASSERT
(
0
);
...
@@ -659,60 +725,21 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
...
@@ -659,60 +725,21 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
}
}
taosHashPut
(
pTq
->
execs
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pExec
,
sizeof
(
STqExec
));
taosHashPut
(
pTq
->
execs
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pExec
,
sizeof
(
STqExec
));
int32_t
code
;
if
(
tqStoreExec
(
pTq
,
req
.
subKey
,
pExec
)
<
0
)
{
int32_t
vlen
;
// TODO
tEncodeSize
(
tEncodeSTqExec
,
pExec
,
vlen
,
code
);
ASSERT
(
code
==
0
);
void
*
buf
=
taosMemoryCalloc
(
1
,
vlen
);
if
(
buf
==
NULL
)
{
ASSERT
(
0
);
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
buf
,
vlen
);
if
(
tEncodeSTqExec
(
&
encoder
,
pExec
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbUpsert
(
pTq
->
pExecStore
,
req
.
subKey
,
(
int
)
strlen
(
req
.
subKey
),
buf
,
vlen
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
}
tEncoderClear
(
&
encoder
);
taosMemoryFree
(
buf
);
return
0
;
return
0
;
}
else
{
}
else
{
/*if (req.newConsumerId != -1) {*/
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
/*taosWLockLatch(&pExec->lock);*/
ASSERT
(
pExec
->
consumerId
==
req
.
oldConsumerId
);
// TODO handle qmsg and exec modification
// TODO handle qmsg and exec modification
atomic_store_32
(
&
pExec
->
epoch
,
-
1
);
atomic_store_32
(
&
pExec
->
epoch
,
-
1
);
atomic_store_64
(
&
pExec
->
consumerId
,
req
.
newConsumerId
);
atomic_store_64
(
&
pExec
->
consumerId
,
req
.
newConsumerId
);
atomic_add_fetch_32
(
&
pExec
->
epoch
,
1
);
atomic_add_fetch_32
(
&
pExec
->
epoch
,
1
);
/*taosWUnLockLatch(&pExec->lock);*/
return
0
;
if
(
tqStoreExec
(
pTq
,
req
.
subKey
,
pExec
)
<
0
)
{
/*} else {*/
// TODO
// TODO
/*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/
}
/*return 0;*/
return
0
;
/*}*/
}
}
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
0 → 100644
浏览文件 @
35d9d801
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
struct
STsdbSnapshotReader
{
STsdb
*
pTsdb
;
// TODO
};
int32_t
tsdbSnapshotReaderOpen
(
STsdb
*
pTsdb
,
STsdbSnapshotReader
**
ppReader
,
int64_t
sver
,
int64_t
ever
)
{
// TODO
return
0
;
}
int32_t
tsdbSnapshotReaderClose
(
STsdbSnapshotReader
*
pReader
)
{
// TODO
return
0
;
}
int32_t
tsdbSnapshotRead
(
STsdbSnapshotReader
*
pReader
,
void
**
ppData
,
uint32_t
*
nData
)
{
// TODO
return
0
;
}
source/dnode/vnode/src/vnd/vnodeSnapshot.c
0 → 100644
浏览文件 @
35d9d801
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
struct
SVSnapshotReader
{
SVnode
*
pVnode
;
int64_t
sver
;
int64_t
ever
;
int8_t
isMetaEnd
;
int8_t
isTsdbEnd
;
SMetaSnapshotReader
*
pMetaReader
;
STsdbSnapshotReader
*
pTsdbReader
;
void
*
pData
;
int32_t
nData
;
};
int32_t
vnodeSnapshotReaderOpen
(
SVnode
*
pVnode
,
SVSnapshotReader
**
ppReader
,
int64_t
sver
,
int64_t
ever
)
{
SVSnapshotReader
*
pReader
=
NULL
;
pReader
=
(
SVSnapshotReader
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pReader
));
if
(
pReader
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pReader
->
pVnode
=
pVnode
;
pReader
->
sver
=
sver
;
pReader
->
ever
=
ever
;
pReader
->
isMetaEnd
=
0
;
pReader
->
isTsdbEnd
=
0
;
if
(
metaSnapshotReaderOpen
(
pVnode
->
pMeta
,
&
pReader
->
pMetaReader
,
sver
,
ever
)
<
0
)
{
taosMemoryFree
(
pReader
);
goto
_err
;
}
if
(
tsdbSnapshotReaderOpen
(
pVnode
->
pTsdb
,
&
pReader
->
pTsdbReader
,
sver
,
ever
)
<
0
)
{
metaSnapshotReaderClose
(
pReader
->
pMetaReader
);
taosMemoryFree
(
pReader
);
goto
_err
;
}
_exit:
*
ppReader
=
pReader
;
return
0
;
_err:
*
ppReader
=
NULL
;
return
-
1
;
}
int32_t
vnodeSnapshotReaderClose
(
SVSnapshotReader
*
pReader
)
{
if
(
pReader
)
{
vnodeFree
(
pReader
->
pData
);
tsdbSnapshotReaderClose
(
pReader
->
pTsdbReader
);
metaSnapshotReaderClose
(
pReader
->
pMetaReader
);
taosMemoryFree
(
pReader
);
}
return
0
;
}
int32_t
vnodeSnapshotRead
(
SVSnapshotReader
*
pReader
,
const
void
**
ppData
,
uint32_t
*
nData
)
{
int32_t
code
=
0
;
if
(
!
pReader
->
isMetaEnd
)
{
code
=
metaSnapshotRead
(
pReader
->
pMetaReader
,
&
pReader
->
pData
,
&
pReader
->
nData
);
if
(
code
)
{
if
(
code
==
TSDB_CODE_VND_READ_END
)
{
pReader
->
isMetaEnd
=
1
;
}
else
{
return
code
;
}
}
else
{
*
ppData
=
pReader
->
pData
;
*
nData
=
pReader
->
nData
;
return
code
;
}
}
if
(
!
pReader
->
isTsdbEnd
)
{
code
=
tsdbSnapshotRead
(
pReader
->
pTsdbReader
,
&
pReader
->
pData
,
&
pReader
->
nData
);
if
(
code
)
{
if
(
code
==
TSDB_CODE_VND_READ_END
)
{
pReader
->
isTsdbEnd
=
1
;
}
else
{
return
code
;
}
}
else
{
*
ppData
=
pReader
->
pData
;
*
nData
=
pReader
->
nData
;
return
code
;
}
}
code
=
TSDB_CODE_VND_READ_END
;
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
35d9d801
...
@@ -617,16 +617,18 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
...
@@ -617,16 +617,18 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
STSchema
*
pSchema
=
NULL
;
STSchema
*
pSchema
=
NULL
;
tb_uid_t
suid
=
0
;
tb_uid_t
suid
=
0
;
STSRow
*
row
=
NULL
;
STSRow
*
row
=
NULL
;
int32_t
rv
=
-
1
;
tInitSubmitBlkIter
(
msgIter
,
pBlock
,
&
blkIter
);
tInitSubmitBlkIter
(
msgIter
,
pBlock
,
&
blkIter
);
if
(
blkIter
.
row
==
NULL
)
return
0
;
if
(
blkIter
.
row
==
NULL
)
return
0
;
if
(
!
pSchema
||
(
suid
!=
msgIter
->
suid
))
{
if
(
!
pSchema
||
(
suid
!=
msgIter
->
suid
)
||
rv
!=
TD_ROW_SVER
(
blkIter
.
row
)
)
{
if
(
pSchema
)
{
if
(
pSchema
)
{
taosMemoryFreeClear
(
pSchema
);
taosMemoryFreeClear
(
pSchema
);
}
}
pSchema
=
metaGetTbTSchema
(
pMeta
,
msgIter
->
suid
,
1
);
// TODO: use the real schema
pSchema
=
metaGetTbTSchema
(
pMeta
,
msgIter
->
suid
,
TD_ROW_SVER
(
blkIter
.
row
)
);
// TODO: use the real schema
if
(
pSchema
)
{
if
(
pSchema
)
{
suid
=
msgIter
->
suid
;
suid
=
msgIter
->
suid
;
rv
=
TD_ROW_SVER
(
blkIter
.
row
);
}
}
}
}
if
(
!
pSchema
)
{
if
(
!
pSchema
)
{
...
...
source/dnode/vnode/src/vnd/vnodeUtil.c
0 → 100644
浏览文件 @
35d9d801
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
int32_t
vnodeRealloc
(
void
**
pp
,
int32_t
size
)
{
uint8_t
*
p
=
NULL
;
int32_t
csize
=
0
;
if
(
*
pp
)
{
p
=
(
uint8_t
*
)(
*
pp
)
-
sizeof
(
int32_t
);
csize
=
*
(
int32_t
*
)
p
;
}
if
(
csize
>=
size
)
{
return
0
;
}
p
=
(
uint8_t
*
)
taosMemoryRealloc
(
p
,
size
);
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
(
int32_t
*
)
p
=
size
;
*
pp
=
p
+
sizeof
(
int32_t
);
return
0
;
}
void
vnodeFree
(
void
*
p
)
{
if
(
p
)
{
taosMemoryFree
(((
uint8_t
*
)
p
)
-
sizeof
(
int32_t
));
}
}
\ No newline at end of file
source/libs/command/src/explain.c
浏览文件 @
35d9d801
...
@@ -16,6 +16,7 @@
...
@@ -16,6 +16,7 @@
#include "commandInt.h"
#include "commandInt.h"
#include "plannodes.h"
#include "plannodes.h"
#include "query.h"
#include "query.h"
#include "tcommon.h"
int32_t
qExplainGenerateResNode
(
SPhysiNode
*
pNode
,
SExplainGroup
*
group
,
SExplainResNode
**
pRes
);
int32_t
qExplainGenerateResNode
(
SPhysiNode
*
pNode
,
SExplainGroup
*
group
,
SExplainResNode
**
pRes
);
int32_t
qExplainAppendGroupResRows
(
void
*
pCtx
,
int32_t
groupId
,
int32_t
level
);
int32_t
qExplainAppendGroupResRows
(
void
*
pCtx
,
int32_t
groupId
,
int32_t
level
);
...
@@ -637,13 +638,48 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
...
@@ -637,13 +638,48 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
&
tlen
));
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
&
tlen
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
}
}
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
pSortNode
->
pSortKeys
->
length
);
SDataBlockDescNode
*
pDescNode
=
pSortNode
->
node
.
pOutputDataBlockDesc
;
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
nodesGetOutputNumFromSlotList
(
pDescNode
->
pSlots
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
p
SortNode
->
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
p
DescNode
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
if
(
EXPLAIN_MODE_ANALYZE
==
ctx
->
mode
)
{
// sort key
EXPLAIN_ROW_NEW
(
level
,
"Sort Key: "
);
if
(
pResNode
->
pExecInfo
)
{
for
(
int32_t
i
=
0
;
i
<
LIST_LENGTH
(
pSortNode
->
pSortKeys
);
++
i
)
{
SOrderByExprNode
*
ptn
=
nodesListGetNode
(
pSortNode
->
pSortKeys
,
i
);
EXPLAIN_ROW_APPEND
(
"%s "
,
nodesGetNameFromColumnNode
(
ptn
->
pExpr
));
}
}
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
// sort method
EXPLAIN_ROW_NEW
(
level
,
"Sort Method: "
);
int32_t
nodeNum
=
taosArrayGetSize
(
pResNode
->
pExecInfo
);
SExplainExecInfo
*
execInfo
=
taosArrayGet
(
pResNode
->
pExecInfo
,
0
);
SSortExecInfo
*
pExecInfo
=
(
SSortExecInfo
*
)
execInfo
->
verboseInfo
;
EXPLAIN_ROW_APPEND
(
"%s"
,
pExecInfo
->
sortMethod
==
SORT_QSORT_T
?
"quicksort"
:
"merge sort"
);
if
(
pExecInfo
->
sortBuffer
>
1024
*
1024
)
{
EXPLAIN_ROW_APPEND
(
" Buffers:%.2f Mb"
,
pExecInfo
->
sortBuffer
/
(
1024
*
1024
.
0
));
}
else
if
(
pExecInfo
->
sortBuffer
>
1024
)
{
EXPLAIN_ROW_APPEND
(
" Buffers:%.2f Kb"
,
pExecInfo
->
sortBuffer
/
(
1024
.
0
));
}
else
{
EXPLAIN_ROW_APPEND
(
" Buffers:%d b"
,
pExecInfo
->
sortBuffer
);
}
EXPLAIN_ROW_APPEND
(
" loops:%d"
,
pExecInfo
->
loops
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
}
if
(
verbose
)
{
if
(
verbose
)
{
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_OUTPUT_FORMAT
);
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_OUTPUT_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
...
@@ -792,13 +828,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
...
@@ -792,13 +828,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
&
tlen
));
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
&
tlen
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
}
}
// EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pPartNode->length);
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pPartNode
->
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pPartNode
->
node
.
pOutputDataBlockDesc
->
totalRowSize
);
// if (pPartNode->pGroupKeys) {
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
// EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pPartNode->pGroupKeys->length);
// }
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
35d9d801
...
@@ -628,12 +628,8 @@ typedef struct SSortOperatorInfo {
...
@@ -628,12 +628,8 @@ typedef struct SSortOperatorInfo {
SArray
*
pColMatchInfo
;
// for index map from table scan output
SArray
*
pColMatchInfo
;
// for index map from table scan output
int32_t
bufPageSize
;
int32_t
bufPageSize
;
// TODO extact struct
int64_t
startTs
;
// sort start time
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
uint64_t
totalElapsed
;
// total elapsed time
}
SSortOperatorInfo
;
}
SSortOperatorInfo
;
typedef
struct
STagFilterOperatorInfo
{
typedef
struct
STagFilterOperatorInfo
{
...
...
source/libs/executor/inc/tsort.h
浏览文件 @
35d9d801
...
@@ -137,6 +137,14 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
...
@@ -137,6 +137,14 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
*/
*/
SSDataBlock
*
tsortGetSortedDataBlock
(
const
SSortHandle
*
pSortHandle
);
SSDataBlock
*
tsortGetSortedDataBlock
(
const
SSortHandle
*
pSortHandle
);
/**
* return the sort execution information.
*
* @param pHandle
* @return
*/
SSortExecInfo
tsortGetSortExecInfo
(
SSortHandle
*
pHandle
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
35d9d801
...
@@ -1747,8 +1747,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
...
@@ -1747,8 +1747,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
SResultRow
*
pRow
=
doSetResultOutBufByKey
(
pSup
->
pResultBuf
,
pResultRowInfo
,
(
char
*
)
&
tid
,
sizeof
(
tid
),
true
,
groupId
,
SResultRow
*
pRow
=
doSetResultOutBufByKey
(
pSup
->
pResultBuf
,
pResultRowInfo
,
(
char
*
)
&
tid
,
sizeof
(
tid
),
true
,
groupId
,
pTaskInfo
,
false
,
pSup
);
pTaskInfo
,
false
,
pSup
);
ASSERT
(
pDataBlock
->
info
.
numOfCols
==
numOfExprs
);
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
struct
SResultRowEntryInfo
*
pEntry
=
getResultCell
(
pRow
,
i
,
rowCellInfoOffset
);
struct
SResultRowEntryInfo
*
pEntry
=
getResultCell
(
pRow
,
i
,
rowCellInfoOffset
);
cleanupResultRowEntry
(
pEntry
);
cleanupResultRowEntry
(
pEntry
);
...
@@ -1756,7 +1755,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
...
@@ -1756,7 +1755,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
pCtx
[
i
].
scanFlag
=
stage
;
pCtx
[
i
].
scanFlag
=
stage
;
}
}
initCtxOutputBuffer
(
pCtx
,
pDataBlock
->
info
.
numOfCol
s
);
initCtxOutputBuffer
(
pCtx
,
numOfExpr
s
);
}
}
void
updateOutputBuf
(
SOptrBasicInfo
*
pBInfo
,
int32_t
*
bufCapacity
,
int32_t
numOfInputRows
)
{
void
updateOutputBuf
(
SOptrBasicInfo
*
pBInfo
,
int32_t
*
bufCapacity
,
int32_t
numOfInputRows
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
35d9d801
...
@@ -806,7 +806,7 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
...
@@ -806,7 +806,7 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
SSDataBlock
*
pDB
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
SSDataBlock
*
pDB
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
blockDataFromBuf
(
pDB
,
buf
);
blockDataFromBuf
(
pDB
,
buf
);
SSDataBlock
*
pSub
=
blockDataExtractBlock
(
pDB
,
pos
->
rowId
,
1
);
SSDataBlock
*
pSub
=
blockDataExtractBlock
(
pDB
,
pos
->
rowId
,
1
);
blockDataMerge
(
pInfo
->
pRes
,
pSub
,
NULL
);
blockDataMerge
(
pInfo
->
pRes
,
pSub
);
blockDataDestroy
(
pDB
);
blockDataDestroy
(
pDB
);
blockDataDestroy
(
pSub
);
blockDataDestroy
(
pSub
);
}
}
...
@@ -1046,8 +1046,9 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
...
@@ -1046,8 +1046,9 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
blockDataDestroy
(
pInfo
->
pRes
);
blockDataDestroy
(
pInfo
->
pRes
);
const
char
*
name
=
tNameGetTableName
(
&
pInfo
->
name
);
const
char
*
name
=
tNameGetTableName
(
&
pInfo
->
name
);
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_USER_TABLES
,
TSDB_TABLE_FNAME_LEN
)
==
0
)
{
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_USER_TABLES
,
TSDB_TABLE_FNAME_LEN
)
==
0
||
pInfo
->
pCur
!=
NULL
)
{
metaCloseTbCursor
(
pInfo
->
pCur
);
metaCloseTbCursor
(
pInfo
->
pCur
);
pInfo
->
pCur
=
NULL
;
}
}
taosArrayDestroy
(
pInfo
->
scanCols
);
taosArrayDestroy
(
pInfo
->
scanCols
);
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
35d9d801
...
@@ -2,6 +2,9 @@
...
@@ -2,6 +2,9 @@
#include "executorimpl.h"
#include "executorimpl.h"
static
SSDataBlock
*
doSort
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doSort
(
SOperatorInfo
*
pOperator
);
static
int32_t
doOpenSortOperator
(
SOperatorInfo
*
pOperator
);
static
int32_t
getExplainExecInfo
(
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
);
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
...
@@ -35,7 +38,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
...
@@ -35,7 +38,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doSort
,
NULL
,
NULL
,
destroyOrderOperatorInfo
,
NULL
,
NULL
,
NULL
);
createOperatorFpSet
(
doOpenSortOperator
,
doSort
,
NULL
,
NULL
,
destroyOrderOperatorInfo
,
NULL
,
NULL
,
getExplainExecInfo
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
return
pOperator
;
...
@@ -121,20 +124,17 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
...
@@ -121,20 +124,17 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
}
}
}
}
SSDataBlock
*
doSort
(
SOperatorInfo
*
pOperator
)
{
int32_t
doOpenSortOperator
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSortOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSortOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
if
(
OPTR_IS_OPENED
(
pOperator
)
)
{
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
)
;
return
TSDB_CODE_SUCCESS
;
}
}
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo
->
startTs
=
taosGetTimestampUs
();
// int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
pInfo
->
pColMatchInfo
,
SORT_SINGLESOURCE_SORT
,
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
pInfo
->
pColMatchInfo
,
SORT_SINGLESOURCE_SORT
,
-
1
,
-
1
,
NULL
,
pTaskInfo
->
id
.
str
);
-
1
,
-
1
,
NULL
,
pTaskInfo
->
id
.
str
);
...
@@ -146,12 +146,39 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
...
@@ -146,12 +146,39 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
int32_t
code
=
tsortOpen
(
pInfo
->
pSortHandle
);
int32_t
code
=
tsortOpen
(
pInfo
->
pSortHandle
);
taosMemoryFreeClear
(
ps
);
taosMemoryFreeClear
(
ps
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
terrno
);
longjmp
(
pTaskInfo
->
env
,
terrno
);
}
}
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
pInfo
->
startTs
)
/
1000
.
0
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
);
OPTR_SET_OPENED
(
pOperator
);
return
TSDB_CODE_SUCCESS
;
}
SSDataBlock
*
doSort
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSortOperatorInfo
*
pInfo
=
pOperator
->
info
;
int32_t
code
=
pOperator
->
fpSet
.
_openFn
(
pOperator
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
SSDataBlock
*
pBlock
=
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
);
if
(
pBlock
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
}
else
{
doSetOperatorCompleted
(
pOperator
);
}
return
pBlock
;
}
}
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
@@ -161,3 +188,15 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -161,3 +188,15 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
}
}
int32_t
getExplainExecInfo
(
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
)
{
ASSERT
(
pOptr
!=
NULL
);
SSortExecInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortExecInfo
));
SSortOperatorInfo
*
pOperatorInfo
=
(
SSortOperatorInfo
*
)
pOptr
->
info
;
*
pInfo
=
tsortGetSortExecInfo
(
pOperatorInfo
->
pSortHandle
);
*
pOptrExplain
=
pInfo
;
*
len
=
sizeof
(
SSortExecInfo
);
return
TSDB_CODE_SUCCESS
;
}
source/libs/executor/src/tsort.c
浏览文件 @
35d9d801
...
@@ -31,20 +31,16 @@ struct STupleHandle {
...
@@ -31,20 +31,16 @@ struct STupleHandle {
struct
SSortHandle
{
struct
SSortHandle
{
int32_t
type
;
int32_t
type
;
int32_t
pageSize
;
int32_t
pageSize
;
int32_t
numOfPages
;
int32_t
numOfPages
;
SDiskbasedBuf
*
pBuf
;
SDiskbasedBuf
*
pBuf
;
SArray
*
pSortInfo
;
SArray
*
pSortInfo
;
SArray
*
pIndexMap
;
SArray
*
pOrderedSource
;
SArray
*
pOrderedSource
;
_sort_fetch_block_fn_t
fetchfp
;
int32_t
loops
;
_sort_merge_compar_fn_t
comparFn
;
SMultiwayMergeTreeInfo
*
pMergeTree
;
int64_t
startTs
;
uint64_t
sortElapsed
;
uint64_t
sortElapsed
;
int64_t
startTs
;
uint64_t
totalElapsed
;
uint64_t
totalElapsed
;
int32_t
sourceId
;
int32_t
sourceId
;
...
@@ -53,13 +49,15 @@ struct SSortHandle {
...
@@ -53,13 +49,15 @@ struct SSortHandle {
int32_t
numOfCompletedSources
;
int32_t
numOfCompletedSources
;
bool
opened
;
bool
opened
;
const
char
*
idStr
;
const
char
*
idStr
;
bool
inMemSort
;
bool
inMemSort
;
bool
needAdjust
;
bool
needAdjust
;
STupleHandle
tupleHandle
;
STupleHandle
tupleHandle
;
void
*
param
;
void
*
param
;
void
(
*
beforeFp
)(
SSDataBlock
*
pBlock
,
void
*
param
);
void
(
*
beforeFp
)(
SSDataBlock
*
pBlock
,
void
*
param
);
_sort_fetch_block_fn_t
fetchfp
;
_sort_merge_compar_fn_t
comparFn
;
SMultiwayMergeTreeInfo
*
pMergeTree
;
};
};
static
int32_t
msortComparFn
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
);
static
int32_t
msortComparFn
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
);
...
@@ -80,7 +78,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
...
@@ -80,7 +78,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
pSortHandle
->
pageSize
=
pageSize
;
pSortHandle
->
pageSize
=
pageSize
;
pSortHandle
->
numOfPages
=
numOfPages
;
pSortHandle
->
numOfPages
=
numOfPages
;
pSortHandle
->
pSortInfo
=
pSortInfo
;
pSortHandle
->
pSortInfo
=
pSortInfo
;
pSortHandle
->
pIndexMap
=
pIndexMap
;
pSortHandle
->
loops
=
0
;
if
(
pBlock
!=
NULL
)
{
if
(
pBlock
!=
NULL
)
{
pSortHandle
->
pDataBlock
=
createOneDataBlock
(
pBlock
,
false
);
pSortHandle
->
pDataBlock
=
createOneDataBlock
(
pBlock
,
false
);
...
@@ -415,6 +413,9 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
...
@@ -415,6 +413,9 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t
numOfRows
=
blockDataGetCapacityInRow
(
pHandle
->
pDataBlock
,
pHandle
->
pageSize
);
int32_t
numOfRows
=
blockDataGetCapacityInRow
(
pHandle
->
pDataBlock
,
pHandle
->
pageSize
);
blockDataEnsureCapacity
(
pHandle
->
pDataBlock
,
numOfRows
);
blockDataEnsureCapacity
(
pHandle
->
pDataBlock
,
numOfRows
);
// the initial pass + sortPass + final mergePass
pHandle
->
loops
=
sortPass
+
2
;
size_t
numOfSorted
=
taosArrayGetSize
(
pHandle
->
pOrderedSource
);
size_t
numOfSorted
=
taosArrayGetSize
(
pHandle
->
pOrderedSource
);
for
(
int32_t
t
=
0
;
t
<
sortPass
;
++
t
)
{
for
(
int32_t
t
=
0
;
t
<
sortPass
;
++
t
)
{
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
...
@@ -502,12 +503,13 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
...
@@ -502,12 +503,13 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return
0
;
return
0
;
}
}
static
int32_t
createInitialSo
rtedMultiSo
urces
(
SSortHandle
*
pHandle
)
{
static
int32_t
createInitialSources
(
SSortHandle
*
pHandle
)
{
size_t
sortBufSize
=
pHandle
->
numOfPages
*
pHandle
->
pageSize
;
size_t
sortBufSize
=
pHandle
->
numOfPages
*
pHandle
->
pageSize
;
if
(
pHandle
->
type
==
SORT_SINGLESOURCE_SORT
)
{
if
(
pHandle
->
type
==
SORT_SINGLESOURCE_SORT
)
{
SSortSource
*
source
=
taosArrayGetP
(
pHandle
->
pOrderedSource
,
0
);
SSortSource
*
source
=
taosArrayGetP
(
pHandle
->
pOrderedSource
,
0
);
taosArrayClear
(
pHandle
->
pOrderedSource
);
taosArrayClear
(
pHandle
->
pOrderedSource
);
while
(
1
)
{
while
(
1
)
{
SSDataBlock
*
pBlock
=
pHandle
->
fetchfp
(
source
->
param
);
SSDataBlock
*
pBlock
=
pHandle
->
fetchfp
(
source
->
param
);
if
(
pBlock
==
NULL
)
{
if
(
pBlock
==
NULL
)
{
...
@@ -524,6 +526,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
...
@@ -524,6 +526,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
}
else
{
}
else
{
pHandle
->
pageSize
=
4096
;
pHandle
->
pageSize
=
4096
;
}
}
// todo!!
// todo!!
pHandle
->
numOfPages
=
1024
;
pHandle
->
numOfPages
=
1024
;
sortBufSize
=
pHandle
->
numOfPages
*
pHandle
->
pageSize
;
sortBufSize
=
pHandle
->
numOfPages
*
pHandle
->
pageSize
;
...
@@ -535,7 +538,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
...
@@ -535,7 +538,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
}
}
// todo relocate the columns
// todo relocate the columns
int32_t
code
=
blockDataMerge
(
pHandle
->
pDataBlock
,
pBlock
,
pHandle
->
pIndexMap
);
int32_t
code
=
blockDataMerge
(
pHandle
->
pDataBlock
,
pBlock
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
return
code
;
return
code
;
}
}
...
@@ -569,6 +572,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
...
@@ -569,6 +572,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
pHandle
->
cmpParam
.
numOfSources
=
1
;
pHandle
->
cmpParam
.
numOfSources
=
1
;
pHandle
->
inMemSort
=
true
;
pHandle
->
inMemSort
=
true
;
pHandle
->
loops
=
1
;
pHandle
->
tupleHandle
.
rowIndex
=
-
1
;
pHandle
->
tupleHandle
.
rowIndex
=
-
1
;
pHandle
->
tupleHandle
.
pBlock
=
pHandle
->
pDataBlock
;
pHandle
->
tupleHandle
.
pBlock
=
pHandle
->
pDataBlock
;
return
0
;
return
0
;
...
@@ -592,7 +596,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
...
@@ -592,7 +596,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
pHandle
->
opened
=
true
;
pHandle
->
opened
=
true
;
int32_t
code
=
createInitialSo
rtedMultiSo
urces
(
pHandle
);
int32_t
code
=
createInitialSources
(
pHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
}
}
...
@@ -692,3 +696,20 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
...
@@ -692,3 +696,20 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
SColumnInfoData
*
pColInfo
=
TARRAY_GET_ELEM
(
pVHandle
->
pBlock
->
pDataBlock
,
colIndex
);
SColumnInfoData
*
pColInfo
=
TARRAY_GET_ELEM
(
pVHandle
->
pBlock
->
pDataBlock
,
colIndex
);
return
colDataGetData
(
pColInfo
,
pVHandle
->
rowIndex
);
return
colDataGetData
(
pColInfo
,
pVHandle
->
rowIndex
);
}
}
SSortExecInfo
tsortGetSortExecInfo
(
SSortHandle
*
pHandle
)
{
SSortExecInfo
info
=
{
0
};
info
.
sortBuffer
=
pHandle
->
pageSize
*
pHandle
->
numOfPages
;
info
.
sortMethod
=
pHandle
->
inMemSort
?
SORT_QSORT_T
:
SORT_SPILLED_MERGE_SORT_T
;
info
.
loops
=
pHandle
->
loops
;
if
(
pHandle
->
pBuf
!=
NULL
)
{
SDiskbasedBufStatis
st
=
getDBufStatis
(
pHandle
->
pBuf
);
info
.
writeBytes
=
st
.
flushBytes
;
info
.
readBytes
=
st
.
loadBytes
;
}
return
info
;
}
source/libs/parser/src/parInsert.c
浏览文件 @
35d9d801
...
@@ -189,6 +189,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
...
@@ -189,6 +189,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
const
char
*
msg1
=
"name too long"
;
const
char
*
msg1
=
"name too long"
;
const
char
*
msg2
=
"invalid database name"
;
const
char
*
msg2
=
"invalid database name"
;
const
char
*
msg3
=
"db is not specified"
;
const
char
*
msg3
=
"db is not specified"
;
const
char
*
msg4
=
"invalid table name"
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
char
*
p
=
strnchr
(
pTableName
->
z
,
TS_PATH_DELIMITER
[
0
],
pTableName
->
n
,
true
);
char
*
p
=
strnchr
(
pTableName
->
z
,
TS_PATH_DELIMITER
[
0
],
pTableName
->
n
,
true
);
...
@@ -207,6 +208,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
...
@@ -207,6 +208,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
}
}
int32_t
tbLen
=
pTableName
->
n
-
dbLen
-
1
;
int32_t
tbLen
=
pTableName
->
n
-
dbLen
-
1
;
if
(
tbLen
<=
0
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg4
);
}
char
tbname
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
char
tbname
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
strncpy
(
tbname
,
p
+
1
,
tbLen
);
strncpy
(
tbname
,
p
+
1
,
tbLen
);
/*tbLen = */
strdequote
(
tbname
);
/*tbLen = */
strdequote
(
tbname
);
...
...
source/util/src/terror.c
浏览文件 @
35d9d801
...
@@ -315,6 +315,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
...
@@ -315,6 +315,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_TABLE_ACTION
,
"Invalid table action"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_TABLE_ACTION
,
"Invalid table action"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_COL_ALREADY_EXISTS
,
"Table column already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_COL_ALREADY_EXISTS
,
"Table column already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_TABLE_COL_NOT_EXISTS
,
"Table column not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_TABLE_COL_NOT_EXISTS
,
"Table column not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_READ_END
,
"Read end"
)
// tsdb
// tsdb
...
...
source/util/src/thash.c
浏览文件 @
35d9d801
...
@@ -708,7 +708,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
...
@@ -708,7 +708,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
pNewNode
->
removed
=
0
;
pNewNode
->
removed
=
0
;
pNewNode
->
next
=
NULL
;
pNewNode
->
next
=
NULL
;
memcpy
(
GET_HASH_NODE_DATA
(
pNewNode
),
pData
,
dsize
);
if
(
pData
)
memcpy
(
GET_HASH_NODE_DATA
(
pNewNode
),
pData
,
dsize
);
memcpy
(
GET_HASH_NODE_KEY
(
pNewNode
),
key
,
keyLen
);
memcpy
(
GET_HASH_NODE_KEY
(
pNewNode
),
key
,
keyLen
);
return
pNewNode
;
return
pNewNode
;
...
@@ -774,7 +774,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
...
@@ -774,7 +774,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
ASSERT
(
prevNode
->
next
!=
prevNode
);
ASSERT
(
prevNode
->
next
!=
prevNode
);
}
else
{
}
else
{
pe
->
next
=
pOld
->
next
;
pe
->
next
=
pOld
->
next
;
SHashNode
*
x
=
pe
->
next
;
SHashNode
*
x
=
pe
->
next
;
if
(
x
!=
NULL
)
{
if
(
x
!=
NULL
)
{
ASSERT
(
x
->
next
!=
x
);
ASSERT
(
x
->
next
!=
x
);
}
}
...
...
source/util/src/tpagedbuf.c
浏览文件 @
35d9d801
...
@@ -549,12 +549,17 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
...
@@ -549,12 +549,17 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
// print the statistics information
// print the statistics information
{
{
SDiskbasedBufStatis
*
ps
=
&
pBuf
->
statis
;
SDiskbasedBufStatis
*
ps
=
&
pBuf
->
statis
;
if
(
ps
->
loadPages
==
0
)
{
uDebug
(
uDebug
(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)"
,
"Kb
\n
"
,
ps
->
getPages
,
ps
->
releasePages
,
ps
->
flushBytes
/
1024
.
0
f
,
ps
->
flushPages
,
ps
->
loadBytes
/
1024
.
0
f
,
ps
->
loadPages
);
}
else
{
uDebug
(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb"
,
ps
->
getPages
,
ps
->
releasePages
,
ps
->
flushBytes
/
1024
.
0
f
,
ps
->
flushPages
,
ps
->
loadBytes
/
1024
.
0
f
,
ps
->
getPages
,
ps
->
releasePages
,
ps
->
flushBytes
/
1024
.
0
f
,
ps
->
flushPages
,
ps
->
loadBytes
/
1024
.
0
f
,
ps
->
loadPages
,
ps
->
loadBytes
/
(
1024
.
0
*
ps
->
loadPages
));
ps
->
loadPages
,
ps
->
loadBytes
/
(
1024
.
0
*
ps
->
loadPages
));
}
}
}
taosRemoveFile
(
pBuf
->
path
);
taosRemoveFile
(
pBuf
->
path
);
taosMemoryFreeClear
(
pBuf
->
path
);
taosMemoryFreeClear
(
pBuf
->
path
);
...
...
tests/script/jenkins/basic.txt
浏览文件 @
35d9d801
...
@@ -67,6 +67,7 @@
...
@@ -67,6 +67,7 @@
# ---- stream
# ---- stream
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
./test.sh -f tsim/stream/session1.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录