Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
87b59d21
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
87b59d21
编写于
9月 15, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into fix/long_query
上级
151fd39d
4ff617ae
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
662 addition
and
547 deletion
+662
-547
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
docs/zh/10-deployment/01-deploy.md
docs/zh/10-deployment/01-deploy.md
+1
-1
docs/zh/12-taos-sql/06-select.md
docs/zh/12-taos-sql/06-select.md
+1
-1
docs/zh/14-reference/14-taosKeeper.md
docs/zh/14-reference/14-taosKeeper.md
+3
-3
include/common/tdataformat.h
include/common/tdataformat.h
+34
-8
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+4
-0
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+2
-0
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+495
-5
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+0
-26
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+6
-157
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+16
-320
source/libs/executor/src/dataDeleter.c
source/libs/executor/src/dataDeleter.c
+24
-14
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+2
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+14
-0
source/libs/nodes/src/nodesMsgFuncs.c
source/libs/nodes/src/nodesMsgFuncs.c
+15
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+6
-0
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+6
-4
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+7
-1
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+15
-1
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+9
-3
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
87b59d21
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
3588b3d
GIT_TAG
e7270c9
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
docs/zh/10-deployment/01-deploy.md
浏览文件 @
87b59d21
...
...
@@ -168,7 +168,7 @@ Query OK, 8 row(s) in set (0.001154s)
## 删除数据节点
先停止要删除的数据节点的 taosd 进程,
然后启动 CLI 程序 taos,执行:
然后启动 CLI 程序 taos,执行:
```sql
DROP DNODE "fqdn:port";
...
...
docs/zh/12-taos-sql/06-select.md
浏览文件 @
87b59d21
...
...
@@ -104,7 +104,7 @@ SELECT location, groupid, current FROM d1001 LIMIT 2;
### 结果去重
`DIS
INT
CT`
关键字可以对结果集中的一列或多列进行去重,去除的列既可以是标签列也可以是数据列。
`DIS
TIN
CT`
关键字可以对结果集中的一列或多列进行去重,去除的列既可以是标签列也可以是数据列。
对标签列去重:
...
...
docs/zh/14-reference/14-taosKeeper.md
浏览文件 @
87b59d21
...
...
@@ -79,7 +79,7 @@ password = "taosdata"
# 需要被监控的 taosAdapter
[taosAdapter]
address
=
["127.0.0.1:6041"
,"192.168.1.95:6041"
]
address
=
["127.0.0.1:6041"]
[metrics]
# 监控指标前缀
...
...
@@ -92,7 +92,7 @@ cluster = "production"
database
=
"log"
# 指定需要监控的普通表
tables
=
[
"normal_table"
]
tables
=
[]
```
### 获取监控指标
...
...
@@ -141,4 +141,4 @@ taos_cluster_info_dnodes_total{cluster_id="5981392874047724755"} 1
# HELP taos_cluster_info_first_ep
# TYPE taos_cluster_info_first_ep gauge
taos_cluster_info_first_ep
{
cluster_id
=
"5981392874047724755"
,value
=
"hlb:6030"
}
1
```
\ No newline at end of file
```
include/common/tdataformat.h
浏览文件 @
87b59d21
...
...
@@ -36,8 +36,13 @@ typedef struct STSRow2 STSRow2;
typedef
struct
STSRowBuilder
STSRowBuilder
;
typedef
struct
STagVal
STagVal
;
typedef
struct
STag
STag
;
typedef
struct
SColData
SColData
;
// bitmap
#define HAS_NONE ((uint8_t)0x1)
#define HAS_NULL ((uint8_t)0x2)
#define HAS_VALUE ((uint8_t)0x4)
// bitmap ================================
const
static
uint8_t
BIT2_MAP
[
4
][
4
]
=
{{
0
b00000000
,
0
b00000001
,
0
b00000010
,
0
},
{
0
b00000000
,
0
b00000100
,
0
b00001000
,
2
},
{
0
b00000000
,
0
b00010000
,
0
b00100000
,
4
},
...
...
@@ -51,21 +56,21 @@ const static uint8_t BIT2_MAP[4][4] = {{0b00000000, 0b00000001, 0b00000010, 0},
#define SET_BIT2(p, i, v) ((p)[(i) >> 2] = (p)[(i) >> 2] & N1(BIT2_MAP[(i)&3][3]) | BIT2_MAP[(i)&3][(v)])
#define GET_BIT2(p, i) (((p)[(i) >> 2] >> BIT2_MAP[(i)&3][3]) & ((uint8_t)3))
// STSchema
// STSchema
================================
int32_t
tTSchemaCreate
(
int32_t
sver
,
SSchema
*
pSchema
,
int32_t
nCols
,
STSchema
**
ppTSchema
);
void
tTSchemaDestroy
(
STSchema
*
pTSchema
);
// SValue
// SValue
================================
int32_t
tPutValue
(
uint8_t
*
p
,
SValue
*
pValue
,
int8_t
type
);
int32_t
tGetValue
(
uint8_t
*
p
,
SValue
*
pValue
,
int8_t
type
);
int
tValueCmprFn
(
const
SValue
*
pValue1
,
const
SValue
*
pValue2
,
int8_t
type
);
// SColVal
// SColVal
================================
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
// STSRow2
// STSRow2
================================
#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL)
#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL)
...
...
@@ -77,7 +82,7 @@ int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
int32_t
tPutTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
);
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
**
ppRow
);
// STSRowBuilder
// STSRowBuilder
================================
#define tsRowBuilderInit() ((STSRowBuilder){0})
#define tsRowBuilderClear(B) \
do { \
...
...
@@ -86,7 +91,7 @@ int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow);
} \
} while (0)
// STag
// STag
================================
int32_t
tTagNew
(
SArray
*
pArray
,
int32_t
version
,
int8_t
isJson
,
STag
**
ppTag
);
void
tTagFree
(
STag
*
pTag
);
bool
tTagIsJson
(
const
void
*
pTag
);
...
...
@@ -100,7 +105,16 @@ void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid);
void
debugPrintSTag
(
STag
*
pTag
,
const
char
*
tag
,
int32_t
ln
);
// TODO: remove
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
);
// STRUCT =================
// SColData ================================
void
tColDataDestroy
(
void
*
ph
);
void
tColDataInit
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
,
int8_t
smaOn
);
void
tColDataClear
(
SColData
*
pColData
);
int32_t
tColDataAppendValue
(
SColData
*
pColData
,
SColVal
*
pColVal
);
void
tColDataGetValue
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
);
uint8_t
tColDataGetBitValue
(
SColData
*
pColData
,
int32_t
iVal
);
int32_t
tColDataCopy
(
SColData
*
pColDataSrc
,
SColData
*
pColDataDest
);
// STRUCT ================================
struct
STColumn
{
col_id_t
colId
;
int8_t
type
;
...
...
@@ -166,6 +180,18 @@ struct SColVal {
SValue
value
;
};
struct
SColData
{
int16_t
cid
;
int8_t
type
;
int8_t
smaOn
;
int32_t
nVal
;
uint8_t
flag
;
uint8_t
*
pBitMap
;
int32_t
*
aOffset
;
int32_t
nData
;
uint8_t
*
pData
;
};
#pragma pack(push, 1)
struct
STagVal
{
// char colName[TSDB_COL_NAME_LEN]; // only used for tmq_get_meta
...
...
include/libs/nodes/plannodes.h
浏览文件 @
87b59d21
...
...
@@ -151,6 +151,8 @@ typedef struct SVnodeModifyLogicNode {
SArray
*
pDataBlocks
;
SVgDataBlocks
*
pVgDataBlocks
;
SNode
*
pAffectedRows
;
// SColumnNode
SNode
*
pStartTs
;
// SColumnNode
SNode
*
pEndTs
;
// SColumnNode
uint64_t
tableId
;
uint64_t
stableId
;
int8_t
tableType
;
// table type
...
...
@@ -525,6 +527,8 @@ typedef struct SDataDeleterNode {
char
tsColName
[
TSDB_COL_NAME_LEN
];
STimeWindow
deleteTimeRange
;
SNode
*
pAffectedRows
;
SNode
*
pStartTs
;
SNode
*
pEndTs
;
}
SDataDeleterNode
;
typedef
struct
SSubplan
{
...
...
include/libs/nodes/querynodes.h
浏览文件 @
87b59d21
...
...
@@ -315,6 +315,8 @@ typedef struct SDeleteStmt {
SNode
*
pFromTable
;
// FROM clause
SNode
*
pWhere
;
// WHERE clause
SNode
*
pCountFunc
;
// count the number of rows affected
SNode
*
pFirstFunc
;
// the start timestamp when the data was actually deleted
SNode
*
pLastFunc
;
// the end timestamp when the data was actually deleted
SNode
*
pTagCond
;
// pWhere divided into pTagCond and timeRange
STimeWindow
timeRange
;
uint8_t
precision
;
...
...
source/common/src/tdataformat.c
浏览文件 @
87b59d21
...
...
@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "tdataformat.h"
#include "tRealloc.h"
#include "tcoding.h"
#include "tdatablock.h"
#include "tlog.h"
...
...
@@ -680,7 +681,7 @@ int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow) {
return
n
;
}
// STSchema
// STSchema
========================================
int32_t
tTSchemaCreate
(
int32_t
sver
,
SSchema
*
pSchema
,
int32_t
ncols
,
STSchema
**
ppTSchema
)
{
*
ppTSchema
=
(
STSchema
*
)
taosMemoryMalloc
(
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
ncols
);
if
(
*
ppTSchema
==
NULL
)
{
...
...
@@ -720,9 +721,7 @@ void tTSchemaDestroy(STSchema *pTSchema) {
if
(
pTSchema
)
taosMemoryFree
(
pTSchema
);
}
// STSRowBuilder
// STag
// STag ========================================
static
int
tTagValCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
if
(((
STagVal
*
)
p1
)
->
cid
<
((
STagVal
*
)
p2
)
->
cid
)
{
return
-
1
;
...
...
@@ -1172,4 +1171,495 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
return
pSchema
;
}
#endif
\ No newline at end of file
#endif
// SColData ========================================
void
tColDataDestroy
(
void
*
ph
)
{
SColData
*
pColData
=
(
SColData
*
)
ph
;
tFree
(
pColData
->
pBitMap
);
tFree
((
uint8_t
*
)
pColData
->
aOffset
);
tFree
(
pColData
->
pData
);
}
void
tColDataInit
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
,
int8_t
smaOn
)
{
pColData
->
cid
=
cid
;
pColData
->
type
=
type
;
pColData
->
smaOn
=
smaOn
;
tColDataClear
(
pColData
);
}
void
tColDataClear
(
SColData
*
pColData
)
{
pColData
->
nVal
=
0
;
pColData
->
flag
=
0
;
pColData
->
nData
=
0
;
}
static
FORCE_INLINE
int32_t
tColDataPutValue
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
int32_t
code
=
0
;
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
code
=
tRealloc
((
uint8_t
**
)(
&
pColData
->
aOffset
),
sizeof
(
int32_t
)
*
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
pColData
->
aOffset
[
pColData
->
nVal
]
=
pColData
->
nData
;
if
(
pColVal
->
value
.
nData
)
{
code
=
tRealloc
(
&
pColData
->
pData
,
pColData
->
nData
+
pColVal
->
value
.
nData
);
if
(
code
)
goto
_exit
;
memcpy
(
pColData
->
pData
+
pColData
->
nData
,
pColVal
->
value
.
pData
,
pColVal
->
value
.
nData
);
pColData
->
nData
+=
pColVal
->
value
.
nData
;
}
}
else
{
ASSERT
(
pColData
->
nData
==
tDataTypes
[
pColData
->
type
].
bytes
*
pColData
->
nVal
);
code
=
tRealloc
(
&
pColData
->
pData
,
pColData
->
nData
+
tDataTypes
[
pColData
->
type
].
bytes
);
if
(
code
)
goto
_exit
;
pColData
->
nData
+=
tPutValue
(
pColData
->
pData
+
pColData
->
nData
,
&
pColVal
->
value
,
pColVal
->
type
);
}
_exit:
return
code
;
}
static
FORCE_INLINE
int32_t
tColDataAppendValue0
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
// 0
int32_t
code
=
0
;
if
(
pColVal
->
isNone
)
{
pColData
->
flag
=
HAS_NONE
;
}
else
if
(
pColVal
->
isNull
)
{
pColData
->
flag
=
HAS_NULL
;
}
else
{
pColData
->
flag
=
HAS_VALUE
;
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
}
pColData
->
nVal
++
;
_exit:
return
code
;
}
static
FORCE_INLINE
int32_t
tColDataAppendValue1
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
// HAS_NONE
int32_t
code
=
0
;
if
(
!
pColVal
->
isNone
)
{
int32_t
nBit
=
BIT1_SIZE
(
pColData
->
nVal
+
1
);
code
=
tRealloc
(
&
pColData
->
pBitMap
,
nBit
);
if
(
code
)
goto
_exit
;
memset
(
pColData
->
pBitMap
,
0
,
nBit
);
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
1
);
if
(
pColVal
->
isNull
)
{
pColData
->
flag
|=
HAS_NULL
;
}
else
{
pColData
->
flag
|=
HAS_VALUE
;
if
(
pColData
->
nVal
)
{
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
int32_t
nOffset
=
sizeof
(
int32_t
)
*
pColData
->
nVal
;
code
=
tRealloc
((
uint8_t
**
)(
&
pColData
->
aOffset
),
nOffset
);
if
(
code
)
goto
_exit
;
memset
(
pColData
->
aOffset
,
0
,
nOffset
);
}
else
{
pColData
->
nData
=
tDataTypes
[
pColData
->
type
].
bytes
*
pColData
->
nVal
;
code
=
tRealloc
(
&
pColData
->
pData
,
pColData
->
nData
);
if
(
code
)
goto
_exit
;
memset
(
pColData
->
pData
,
0
,
pColData
->
nData
);
}
}
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
}
}
pColData
->
nVal
++
;
_exit:
return
code
;
}
static
FORCE_INLINE
int32_t
tColDataAppendValue2
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
// HAS_NULL
int32_t
code
=
0
;
if
(
!
pColVal
->
isNull
)
{
int32_t
nBit
=
BIT1_SIZE
(
pColData
->
nVal
+
1
);
code
=
tRealloc
(
&
pColData
->
pBitMap
,
nBit
);
if
(
code
)
goto
_exit
;
if
(
pColVal
->
isNone
)
{
pColData
->
flag
|=
HAS_NONE
;
memset
(
pColData
->
pBitMap
,
255
,
nBit
);
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
0
);
}
else
{
pColData
->
flag
|=
HAS_VALUE
;
memset
(
pColData
->
pBitMap
,
0
,
nBit
);
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
1
);
if
(
pColData
->
nVal
)
{
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
int32_t
nOffset
=
sizeof
(
int32_t
)
*
pColData
->
nVal
;
code
=
tRealloc
((
uint8_t
**
)(
&
pColData
->
aOffset
),
nOffset
);
if
(
code
)
goto
_exit
;
memset
(
pColData
->
aOffset
,
0
,
nOffset
);
}
else
{
pColData
->
nData
=
tDataTypes
[
pColData
->
type
].
bytes
*
pColData
->
nVal
;
code
=
tRealloc
(
&
pColData
->
pData
,
pColData
->
nData
);
if
(
code
)
goto
_exit
;
memset
(
pColData
->
pData
,
0
,
pColData
->
nData
);
}
}
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
}
}
pColData
->
nVal
++
;
_exit:
return
code
;
}
static
FORCE_INLINE
int32_t
tColDataAppendValue3
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
// HAS_NULL|HAS_NONE
int32_t
code
=
0
;
if
(
pColVal
->
isNone
)
{
code
=
tRealloc
(
&
pColData
->
pBitMap
,
BIT1_SIZE
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
0
);
}
else
if
(
pColVal
->
isNull
)
{
code
=
tRealloc
(
&
pColData
->
pBitMap
,
BIT1_SIZE
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
1
);
}
else
{
pColData
->
flag
|=
HAS_VALUE
;
uint8_t
*
pBitMap
=
NULL
;
code
=
tRealloc
(
&
pBitMap
,
BIT2_SIZE
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
for
(
int32_t
iVal
=
0
;
iVal
<
pColData
->
nVal
;
iVal
++
)
{
SET_BIT2
(
pBitMap
,
iVal
,
GET_BIT1
(
pColData
->
pBitMap
,
iVal
));
}
SET_BIT2
(
pBitMap
,
pColData
->
nVal
,
2
);
tFree
(
pColData
->
pBitMap
);
pColData
->
pBitMap
=
pBitMap
;
if
(
pColData
->
nVal
)
{
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
int32_t
nOffset
=
sizeof
(
int32_t
)
*
pColData
->
nVal
;
code
=
tRealloc
((
uint8_t
**
)(
&
pColData
->
aOffset
),
nOffset
);
if
(
code
)
goto
_exit
;
memset
(
pColData
->
aOffset
,
0
,
nOffset
);
}
else
{
pColData
->
nData
=
tDataTypes
[
pColData
->
type
].
bytes
*
pColData
->
nVal
;
code
=
tRealloc
(
&
pColData
->
pData
,
pColData
->
nData
);
if
(
code
)
goto
_exit
;
memset
(
pColData
->
pData
,
0
,
pColData
->
nData
);
}
}
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
}
pColData
->
nVal
++
;
_exit:
return
code
;
}
static
FORCE_INLINE
int32_t
tColDataAppendValue4
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
// HAS_VALUE
int32_t
code
=
0
;
if
(
pColVal
->
isNone
||
pColVal
->
isNull
)
{
if
(
pColVal
->
isNone
)
{
pColData
->
flag
|=
HAS_NONE
;
}
else
{
pColData
->
flag
|=
HAS_NULL
;
}
int32_t
nBit
=
BIT1_SIZE
(
pColData
->
nVal
+
1
);
code
=
tRealloc
(
&
pColData
->
pBitMap
,
nBit
);
if
(
code
)
goto
_exit
;
memset
(
pColData
->
pBitMap
,
255
,
nBit
);
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
0
);
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
}
else
{
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
}
pColData
->
nVal
++
;
_exit:
return
code
;
}
static
FORCE_INLINE
int32_t
tColDataAppendValue5
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
// HAS_VALUE|HAS_NONE
int32_t
code
=
0
;
if
(
pColVal
->
isNull
)
{
pColData
->
flag
|=
HAS_NULL
;
uint8_t
*
pBitMap
=
NULL
;
code
=
tRealloc
(
&
pBitMap
,
BIT2_SIZE
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
for
(
int32_t
iVal
=
0
;
iVal
<
pColData
->
nVal
;
iVal
++
)
{
SET_BIT2
(
pBitMap
,
iVal
,
GET_BIT1
(
pColData
->
pBitMap
,
iVal
)
?
2
:
0
);
}
SET_BIT2
(
pBitMap
,
pColData
->
nVal
,
1
);
tFree
(
pColData
->
pBitMap
);
pColData
->
pBitMap
=
pBitMap
;
}
else
{
code
=
tRealloc
(
&
pColData
->
pBitMap
,
BIT1_SIZE
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
if
(
pColVal
->
isNone
)
{
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
0
);
}
else
{
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
1
);
}
}
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
pColData
->
nVal
++
;
_exit:
return
code
;
}
static
FORCE_INLINE
int32_t
tColDataAppendValue6
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
// HAS_VALUE|HAS_NULL
int32_t
code
=
0
;
if
(
pColVal
->
isNone
)
{
pColData
->
flag
|=
HAS_NONE
;
uint8_t
*
pBitMap
=
NULL
;
code
=
tRealloc
(
&
pBitMap
,
BIT2_SIZE
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
for
(
int32_t
iVal
=
0
;
iVal
<
pColData
->
nVal
;
iVal
++
)
{
SET_BIT2
(
pBitMap
,
iVal
,
GET_BIT1
(
pColData
->
pBitMap
,
iVal
)
?
2
:
1
);
}
SET_BIT2
(
pBitMap
,
pColData
->
nVal
,
0
);
tFree
(
pColData
->
pBitMap
);
pColData
->
pBitMap
=
pBitMap
;
}
else
{
code
=
tRealloc
(
&
pColData
->
pBitMap
,
BIT1_SIZE
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
if
(
pColVal
->
isNull
)
{
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
0
);
}
else
{
SET_BIT1
(
pColData
->
pBitMap
,
pColData
->
nVal
,
1
);
}
}
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
pColData
->
nVal
++
;
_exit:
return
code
;
}
static
FORCE_INLINE
int32_t
tColDataAppendValue7
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
// HAS_VALUE|HAS_NULL|HAS_NONE
int32_t
code
=
0
;
code
=
tRealloc
(
&
pColData
->
pBitMap
,
BIT2_SIZE
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
if
(
pColVal
->
isNone
)
{
SET_BIT2
(
pColData
->
pBitMap
,
pColData
->
nVal
,
0
);
}
else
if
(
pColVal
->
isNull
)
{
SET_BIT2
(
pColData
->
pBitMap
,
pColData
->
nVal
,
1
);
}
else
{
SET_BIT2
(
pColData
->
pBitMap
,
pColData
->
nVal
,
2
);
}
code
=
tColDataPutValue
(
pColData
,
pColVal
);
if
(
code
)
goto
_exit
;
pColData
->
nVal
++
;
_exit:
return
code
;
}
static
int32_t
(
*
tColDataAppendValueImpl
[])(
SColData
*
pColData
,
SColVal
*
pColVal
)
=
{
tColDataAppendValue0
,
// 0
tColDataAppendValue1
,
// HAS_NONE
tColDataAppendValue2
,
// HAS_NULL
tColDataAppendValue3
,
// HAS_NULL|HAS_NONE
tColDataAppendValue4
,
// HAS_VALUE
tColDataAppendValue5
,
// HAS_VALUE|HAS_NONE
tColDataAppendValue6
,
// HAS_VALUE|HAS_NULL
tColDataAppendValue7
// HAS_VALUE|HAS_NULL|HAS_NONE
};
int32_t
tColDataAppendValue
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
ASSERT
(
pColData
->
cid
==
pColVal
->
cid
&&
pColData
->
type
==
pColVal
->
type
);
return
tColDataAppendValueImpl
[
pColData
->
flag
](
pColData
,
pColVal
);
}
static
FORCE_INLINE
void
tColDataGetValue1
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
// HAS_NONE
*
pColVal
=
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
);
}
static
FORCE_INLINE
void
tColDataGetValue2
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
// HAS_NULL
*
pColVal
=
COL_VAL_NULL
(
pColData
->
cid
,
pColData
->
type
);
}
static
FORCE_INLINE
void
tColDataGetValue3
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
// HAS_NULL|HAS_NONE
switch
(
GET_BIT1
(
pColData
->
pBitMap
,
iVal
))
{
case
0
:
*
pColVal
=
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
);
break
;
case
1
:
*
pColVal
=
COL_VAL_NULL
(
pColData
->
cid
,
pColData
->
type
);
break
;
default:
ASSERT
(
0
);
}
}
static
FORCE_INLINE
void
tColDataGetValue4
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
// HAS_VALUE
SValue
value
;
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
if
(
iVal
+
1
<
pColData
->
nVal
)
{
value
.
nData
=
pColData
->
aOffset
[
iVal
+
1
]
-
pColData
->
aOffset
[
iVal
];
}
else
{
value
.
nData
=
pColData
->
nData
-
pColData
->
aOffset
[
iVal
];
}
value
.
pData
=
pColData
->
pData
+
pColData
->
aOffset
[
iVal
];
}
else
{
tGetValue
(
pColData
->
pData
+
tDataTypes
[
pColData
->
type
].
bytes
*
iVal
,
&
value
,
pColData
->
type
);
}
*
pColVal
=
COL_VAL_VALUE
(
pColData
->
cid
,
pColData
->
type
,
value
);
}
static
FORCE_INLINE
void
tColDataGetValue5
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
// HAS_VALUE|HAS_NONE
switch
(
GET_BIT1
(
pColData
->
pBitMap
,
iVal
))
{
case
0
:
*
pColVal
=
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
);
break
;
case
1
:
tColDataGetValue4
(
pColData
,
iVal
,
pColVal
);
break
;
default:
ASSERT
(
0
);
}
}
static
FORCE_INLINE
void
tColDataGetValue6
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
// HAS_VALUE|HAS_NULL
switch
(
GET_BIT1
(
pColData
->
pBitMap
,
iVal
))
{
case
0
:
*
pColVal
=
COL_VAL_NULL
(
pColData
->
cid
,
pColData
->
type
);
break
;
case
1
:
tColDataGetValue4
(
pColData
,
iVal
,
pColVal
);
break
;
default:
ASSERT
(
0
);
}
}
static
FORCE_INLINE
void
tColDataGetValue7
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
// HAS_VALUE|HAS_NULL|HAS_NONE
switch
(
GET_BIT2
(
pColData
->
pBitMap
,
iVal
))
{
case
0
:
*
pColVal
=
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
);
break
;
case
1
:
*
pColVal
=
COL_VAL_NULL
(
pColData
->
cid
,
pColData
->
type
);
break
;
case
2
:
tColDataGetValue4
(
pColData
,
iVal
,
pColVal
);
break
;
default:
ASSERT
(
0
);
}
}
static
void
(
*
tColDataGetValueImpl
[])(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
=
{
NULL
,
// 0
tColDataGetValue1
,
// HAS_NONE
tColDataGetValue2
,
// HAS_NULL
tColDataGetValue3
,
// HAS_NULL | HAS_NONE
tColDataGetValue4
,
// HAS_VALUE
tColDataGetValue5
,
// HAS_VALUE | HAS_NONE
tColDataGetValue6
,
// HAS_VALUE | HAS_NULL
tColDataGetValue7
// HAS_VALUE | HAS_NULL | HAS_NONE
};
void
tColDataGetValue
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
ASSERT
(
iVal
>=
0
&&
iVal
<
pColData
->
nVal
&&
pColData
->
flag
);
tColDataGetValueImpl
[
pColData
->
flag
](
pColData
,
iVal
,
pColVal
);
}
uint8_t
tColDataGetBitValue
(
SColData
*
pColData
,
int32_t
iVal
)
{
uint8_t
v
;
switch
(
pColData
->
flag
)
{
case
HAS_NONE
:
v
=
0
;
break
;
case
HAS_NULL
:
v
=
1
;
break
;
case
(
HAS_NULL
|
HAS_NONE
):
v
=
GET_BIT1
(
pColData
->
pBitMap
,
iVal
);
break
;
case
HAS_VALUE
:
v
=
2
;
break
;
case
(
HAS_VALUE
|
HAS_NONE
):
v
=
GET_BIT1
(
pColData
->
pBitMap
,
iVal
);
if
(
v
)
v
=
2
;
break
;
case
(
HAS_VALUE
|
HAS_NULL
):
v
=
GET_BIT1
(
pColData
->
pBitMap
,
iVal
)
+
1
;
break
;
case
(
HAS_VALUE
|
HAS_NULL
|
HAS_NONE
):
v
=
GET_BIT2
(
pColData
->
pBitMap
,
iVal
);
break
;
default:
ASSERT
(
0
);
break
;
}
return
v
;
}
int32_t
tColDataCopy
(
SColData
*
pColDataSrc
,
SColData
*
pColDataDest
)
{
int32_t
code
=
0
;
int32_t
size
;
ASSERT
(
pColDataSrc
->
nVal
>
0
);
ASSERT
(
pColDataDest
->
cid
=
pColDataSrc
->
cid
);
ASSERT
(
pColDataDest
->
type
=
pColDataSrc
->
type
);
pColDataDest
->
smaOn
=
pColDataSrc
->
smaOn
;
pColDataDest
->
nVal
=
pColDataSrc
->
nVal
;
pColDataDest
->
flag
=
pColDataSrc
->
flag
;
// bitmap
if
(
pColDataSrc
->
flag
!=
HAS_NONE
&&
pColDataSrc
->
flag
!=
HAS_NULL
&&
pColDataSrc
->
flag
!=
HAS_VALUE
)
{
size
=
BIT2_SIZE
(
pColDataSrc
->
nVal
);
code
=
tRealloc
(
&
pColDataDest
->
pBitMap
,
size
);
if
(
code
)
goto
_exit
;
memcpy
(
pColDataDest
->
pBitMap
,
pColDataSrc
->
pBitMap
,
size
);
}
// offset
if
(
IS_VAR_DATA_TYPE
(
pColDataDest
->
type
))
{
size
=
sizeof
(
int32_t
)
*
pColDataSrc
->
nVal
;
code
=
tRealloc
((
uint8_t
**
)
&
pColDataDest
->
aOffset
,
size
);
if
(
code
)
goto
_exit
;
memcpy
(
pColDataDest
->
aOffset
,
pColDataSrc
->
aOffset
,
size
);
}
// value
pColDataDest
->
nData
=
pColDataSrc
->
nData
;
code
=
tRealloc
(
&
pColDataDest
->
pData
,
pColDataSrc
->
nData
);
if
(
code
)
goto
_exit
;
memcpy
(
pColDataDest
->
pData
,
pColDataSrc
->
pData
,
pColDataDest
->
nData
);
_exit:
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
87b59d21
...
...
@@ -44,7 +44,6 @@ typedef struct SMapData SMapData;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SDataBlk
SDataBlk
;
typedef
struct
SSttBlk
SSttBlk
;
typedef
struct
SColData
SColData
;
typedef
struct
SDiskDataHdr
SDiskDataHdr
;
typedef
struct
SBlockData
SBlockData
;
typedef
struct
SDelFile
SDelFile
;
...
...
@@ -71,10 +70,6 @@ typedef struct SLDataIter SLDataIter;
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
#define HAS_NONE ((int8_t)0x1)
#define HAS_NULL ((int8_t)0x2)
#define HAS_VALUE ((int8_t)0x4)
#define VERSION_MIN 0
#define VERSION_MAX INT64_MAX
...
...
@@ -148,15 +143,6 @@ int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t
tGetBlockIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tCmprBlockIdx
(
void
const
*
lhs
,
void
const
*
rhs
);
int32_t
tCmprBlockL
(
void
const
*
lhs
,
void
const
*
rhs
);
// SColdata
void
tColDataInit
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
,
int8_t
smaOn
);
void
tColDataReset
(
SColData
*
pColData
);
void
tColDataClear
(
void
*
ph
);
int32_t
tColDataAppendValue
(
SColData
*
pColData
,
SColVal
*
pColVal
);
int32_t
tColDataGetValue
(
SColData
*
pColData
,
int32_t
iRow
,
SColVal
*
pColVal
);
int32_t
tColDataCopy
(
SColData
*
pColDataSrc
,
SColData
*
pColDataDest
);
int32_t
tPutColData
(
uint8_t
*
p
,
SColData
*
pColData
);
int32_t
tGetColData
(
uint8_t
*
p
,
SColData
*
pColData
);
// SBlockData
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
...
...
@@ -473,18 +459,6 @@ struct SSttBlk {
SBlockInfo
bInfo
;
};
struct
SColData
{
int16_t
cid
;
int8_t
type
;
int8_t
smaOn
;
int32_t
nVal
;
uint8_t
flag
;
uint8_t
*
pBitMap
;
int32_t
*
aOffset
;
int32_t
nData
;
uint8_t
*
pData
;
};
// (SBlockData){.suid = 0, .uid = 0}: block data not initialized
// (SBlockData){.suid = suid, .uid = uid}: block data for ONE child table int .data file
// (SBlockData){.suid = suid, .uid = 0}: block data for N child tables int .last file
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
87b59d21
...
...
@@ -832,7 +832,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
tDecoderClear
(
pCoder
);
int32_t
sz
=
taosArrayGetSize
(
pRes
->
uidList
);
if
(
sz
==
0
)
{
if
(
sz
==
0
||
pRes
->
affectedRows
==
0
)
{
taosArrayDestroy
(
pRes
->
uidList
);
return
0
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
87b59d21
...
...
@@ -895,7 +895,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
// null value exists, check one-by-one
if
(
pData
->
flag
!=
HAS_VALUE
)
{
for
(
int32_t
j
=
pDumpInfo
->
rowIndex
;
rowIndex
<
remain
;
j
+=
step
,
rowIndex
++
)
{
uint8_t
v
=
GET_BIT2
(
pData
->
pBitMap
,
j
);
uint8_t
v
=
tColDataGetBitValue
(
pData
,
j
);
if
(
v
==
0
||
v
==
1
)
{
colDataSetNull_f
(
pColData
->
nullbitmap
,
rowIndex
);
}
...
...
@@ -1354,15 +1354,10 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
pInfo
->
hasDupTs
=
(
pBlock
->
nSubBlock
==
1
)
?
pBlock
->
hasDup
:
true
;
pInfo
->
overlapWithDelInfo
=
overlapWithDelSkyline
(
pScanInfo
,
pBlock
,
pReader
->
order
);
// todo here we need to each key in the last files to identify if it is really overlapped with last block
// todo
bool
overlapWithlastBlock
=
false
;
#if 0
if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
SSttBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
if
(
hasDataInLastBlock
(
pLastBlockReader
))
{
int64_t
tsLast
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
pInfo
->
overlapWithLastBlock
=
!
(
pBlock
->
maxKey
.
ts
<
tsLast
||
pBlock
->
minKey
.
ts
>
tsLast
);
}
#endif
pInfo
->
moreThanCapcity
=
pBlock
->
nRow
>
pReader
->
capacity
;
pInfo
->
partiallyRequired
=
dataBlockPartiallyRequired
(
&
pReader
->
window
,
&
pReader
->
verRange
,
pBlock
);
...
...
@@ -1888,151 +1883,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return
code
;
}
#if 0
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SArray* pDelList = pBlockScanInfo->delSkyline;
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
ASSERT(pRow != NULL && piRow != NULL);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
bool freeTSRow = false;
uint64_t uid = pBlockScanInfo->uid;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
if (ASCENDING_TRAVERSE(pReader->order)) {
// [1&2] key <= [k.ts && ik.ts]
if (key <= k.ts && key <= ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
if (ik.ts == key) {
tRowMerge(&merge, piRow);
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (k.ts == key) {
tRowMerge(&merge, pRow);
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
return TSDB_CODE_SUCCESS;
} else { // key > ik.ts || key > k.ts
ASSERT(key != ik.ts);
// [3] ik.ts < key <= k.ts
// [4] ik.ts < k.ts <= key
if (ik.ts < k.ts) {
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
return TSDB_CODE_SUCCESS;
}
// [5] k.ts < key <= ik.ts
// [6] k.ts < ik.ts <= key
if (k.ts < ik.ts) {
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
return TSDB_CODE_SUCCESS;
}
// [7] k.ts == ik.ts < key
if (k.ts == ik.ts) {
ASSERT(key > ik.ts && key > k.ts);
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS;
}
}
} else { // descending order scan
// [1/2] k.ts >= ik.ts && k.ts >= key
if (k.ts >= ik.ts && k.ts >= key) {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
if (ik.ts == k.ts) {
tRowMerge(&merge, piRow);
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (k.ts == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
return TSDB_CODE_SUCCESS;
} else {
ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch
// [3] ik.ts > k.ts >= Key
// [4] ik.ts > key >= k.ts
if (ik.ts > key) {
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
return TSDB_CODE_SUCCESS;
}
// [5] key > ik.ts > k.ts
// [6] key > k.ts > ik.ts
if (key > ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS;
}
//[7] key = ik.ts > k.ts
if (key == ik.ts) {
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS;
}
}
}
ASSERT(0);
return -1;
}
#endif
static
int32_t
initMemDataIterator
(
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
)
{
if
(
pBlockScanInfo
->
iterInit
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2250,7 +2100,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t
step
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
1
:
-
1
;
while
(
1
)
{
// todo check the validate of row in file block
bool
hasBlockData
=
false
;
{
while
(
pBlockData
->
nRow
>
0
)
{
// find the first qualified row in data block
...
...
@@ -2611,7 +2460,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
code
=
buildComposedDataBlock
(
pReader
);
}
else
if
(
bufferDataInFileBlockGap
(
pReader
->
order
,
keyInBuf
,
pBlock
))
{
// data in memory that are earlier than current file block
//
todo
rows in buffer should be less than the file block in asc, greater than file block in desc
// rows in buffer should be less than the file block in asc, greater than file block in desc
int64_t
endKey
=
(
ASCENDING_TRAVERSE
(
pReader
->
order
))
?
pBlock
->
minKey
.
ts
:
pBlock
->
maxKey
.
ts
;
code
=
buildDataBlockFromBuf
(
pReader
,
pScanInfo
,
endKey
);
}
else
{
...
...
@@ -4072,4 +3921,4 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
}
tsdbTrace
(
"vgId:%d, untake read snapshot"
,
TD_VID
(
pTsdb
->
pVnode
));
}
\ No newline at end of file
}
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
87b59d21
...
...
@@ -909,248 +909,6 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
return
code
;
}
// SColData ========================================
void
tColDataInit
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
,
int8_t
smaOn
)
{
pColData
->
cid
=
cid
;
pColData
->
type
=
type
;
pColData
->
smaOn
=
smaOn
;
tColDataReset
(
pColData
);
}
void
tColDataReset
(
SColData
*
pColData
)
{
pColData
->
nVal
=
0
;
pColData
->
flag
=
0
;
pColData
->
nData
=
0
;
}
void
tColDataClear
(
void
*
ph
)
{
SColData
*
pColData
=
(
SColData
*
)
ph
;
tFree
(
pColData
->
pBitMap
);
tFree
((
uint8_t
*
)
pColData
->
aOffset
);
tFree
(
pColData
->
pData
);
}
int32_t
tColDataAppendValue
(
SColData
*
pColData
,
SColVal
*
pColVal
)
{
int32_t
code
=
0
;
int64_t
size
;
SValue
value
=
{
0
};
SValue
*
pValue
=
&
value
;
ASSERT
(
pColVal
->
cid
==
pColData
->
cid
);
ASSERT
(
pColVal
->
type
==
pColData
->
type
);
// realloc bitmap
size
=
BIT2_SIZE
(
pColData
->
nVal
+
1
);
code
=
tRealloc
(
&
pColData
->
pBitMap
,
size
);
if
(
code
)
goto
_exit
;
if
((
pColData
->
nVal
&
3
)
==
0
)
{
pColData
->
pBitMap
[
pColData
->
nVal
>>
2
]
=
0
;
}
// put value
if
(
pColVal
->
isNone
)
{
pColData
->
flag
|=
HAS_NONE
;
SET_BIT2
(
pColData
->
pBitMap
,
pColData
->
nVal
,
0
);
}
else
if
(
pColVal
->
isNull
)
{
pColData
->
flag
|=
HAS_NULL
;
SET_BIT2
(
pColData
->
pBitMap
,
pColData
->
nVal
,
1
);
}
else
{
pColData
->
flag
|=
HAS_VALUE
;
SET_BIT2
(
pColData
->
pBitMap
,
pColData
->
nVal
,
2
);
pValue
=
&
pColVal
->
value
;
}
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
// offset
code
=
tRealloc
((
uint8_t
**
)
&
pColData
->
aOffset
,
sizeof
(
int32_t
)
*
(
pColData
->
nVal
+
1
));
if
(
code
)
goto
_exit
;
pColData
->
aOffset
[
pColData
->
nVal
]
=
pColData
->
nData
;
// value
if
((
!
pColVal
->
isNone
)
&&
(
!
pColVal
->
isNull
))
{
code
=
tRealloc
(
&
pColData
->
pData
,
pColData
->
nData
+
pColVal
->
value
.
nData
);
if
(
code
)
goto
_exit
;
memcpy
(
pColData
->
pData
+
pColData
->
nData
,
pColVal
->
value
.
pData
,
pColVal
->
value
.
nData
);
pColData
->
nData
+=
pColVal
->
value
.
nData
;
}
}
else
{
code
=
tRealloc
(
&
pColData
->
pData
,
pColData
->
nData
+
tPutValue
(
NULL
,
pValue
,
pColVal
->
type
));
if
(
code
)
goto
_exit
;
pColData
->
nData
+=
tPutValue
(
pColData
->
pData
+
pColData
->
nData
,
pValue
,
pColVal
->
type
);
}
pColData
->
nVal
++
;
_exit:
return
code
;
}
int32_t
tColDataCopy
(
SColData
*
pColDataSrc
,
SColData
*
pColDataDest
)
{
int32_t
code
=
0
;
int32_t
size
;
ASSERT
(
pColDataSrc
->
nVal
>
0
);
ASSERT
(
pColDataDest
->
cid
=
pColDataSrc
->
cid
);
ASSERT
(
pColDataDest
->
type
=
pColDataSrc
->
type
);
pColDataDest
->
smaOn
=
pColDataSrc
->
smaOn
;
pColDataDest
->
nVal
=
pColDataSrc
->
nVal
;
pColDataDest
->
flag
=
pColDataSrc
->
flag
;
// bitmap
if
(
pColDataSrc
->
flag
!=
HAS_NONE
&&
pColDataSrc
->
flag
!=
HAS_NULL
&&
pColDataSrc
->
flag
!=
HAS_VALUE
)
{
size
=
BIT2_SIZE
(
pColDataSrc
->
nVal
);
code
=
tRealloc
(
&
pColDataDest
->
pBitMap
,
size
);
if
(
code
)
goto
_exit
;
memcpy
(
pColDataDest
->
pBitMap
,
pColDataSrc
->
pBitMap
,
size
);
}
// offset
if
(
IS_VAR_DATA_TYPE
(
pColDataDest
->
type
))
{
size
=
sizeof
(
int32_t
)
*
pColDataSrc
->
nVal
;
code
=
tRealloc
((
uint8_t
**
)
&
pColDataDest
->
aOffset
,
size
);
if
(
code
)
goto
_exit
;
memcpy
(
pColDataDest
->
aOffset
,
pColDataSrc
->
aOffset
,
size
);
}
// value
pColDataDest
->
nData
=
pColDataSrc
->
nData
;
code
=
tRealloc
(
&
pColDataDest
->
pData
,
pColDataSrc
->
nData
);
if
(
code
)
goto
_exit
;
memcpy
(
pColDataDest
->
pData
,
pColDataSrc
->
pData
,
pColDataDest
->
nData
);
_exit:
return
code
;
}
int32_t
tColDataGetValue
(
SColData
*
pColData
,
int32_t
iVal
,
SColVal
*
pColVal
)
{
int32_t
code
=
0
;
ASSERT
(
iVal
<
pColData
->
nVal
);
ASSERT
(
pColData
->
flag
);
if
(
pColData
->
flag
==
HAS_NONE
)
{
*
pColVal
=
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
);
goto
_exit
;
}
else
if
(
pColData
->
flag
==
HAS_NULL
)
{
*
pColVal
=
COL_VAL_NULL
(
pColData
->
cid
,
pColData
->
type
);
goto
_exit
;
}
else
if
(
pColData
->
flag
!=
HAS_VALUE
)
{
uint8_t
v
=
GET_BIT2
(
pColData
->
pBitMap
,
iVal
);
if
(
v
==
0
)
{
*
pColVal
=
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
);
goto
_exit
;
}
else
if
(
v
==
1
)
{
*
pColVal
=
COL_VAL_NULL
(
pColData
->
cid
,
pColData
->
type
);
goto
_exit
;
}
}
// get value
SValue
value
;
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
if
(
iVal
+
1
<
pColData
->
nVal
)
{
value
.
nData
=
pColData
->
aOffset
[
iVal
+
1
]
-
pColData
->
aOffset
[
iVal
];
}
else
{
value
.
nData
=
pColData
->
nData
-
pColData
->
aOffset
[
iVal
];
}
value
.
pData
=
pColData
->
pData
+
pColData
->
aOffset
[
iVal
];
}
else
{
tGetValue
(
pColData
->
pData
+
tDataTypes
[
pColData
->
type
].
bytes
*
iVal
,
&
value
,
pColData
->
type
);
}
*
pColVal
=
COL_VAL_VALUE
(
pColData
->
cid
,
pColData
->
type
,
value
);
_exit:
return
code
;
}
int32_t
tPutColData
(
uint8_t
*
p
,
SColData
*
pColData
)
{
int32_t
n
=
0
;
n
+=
tPutI16v
(
p
?
p
+
n
:
p
,
pColData
->
cid
);
n
+=
tPutI8
(
p
?
p
+
n
:
p
,
pColData
->
type
);
n
+=
tPutI8
(
p
?
p
+
n
:
p
,
pColData
->
smaOn
);
n
+=
tPutI32v
(
p
?
p
+
n
:
p
,
pColData
->
nVal
);
n
+=
tPutU8
(
p
?
p
+
n
:
p
,
pColData
->
flag
);
if
(
pColData
->
flag
==
HAS_NONE
||
pColData
->
flag
==
HAS_NULL
)
goto
_exit
;
if
(
pColData
->
flag
!=
HAS_VALUE
)
{
// bitmap
int32_t
size
=
BIT2_SIZE
(
pColData
->
nVal
);
if
(
p
)
{
memcpy
(
p
+
n
,
pColData
->
pBitMap
,
size
);
}
n
+=
size
;
}
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
// offset
int32_t
size
=
sizeof
(
int32_t
)
*
pColData
->
nVal
;
if
(
p
)
{
memcpy
(
p
+
n
,
pColData
->
aOffset
,
size
);
}
n
+=
size
;
}
n
+=
tPutI32v
(
p
?
p
+
n
:
p
,
pColData
->
nData
);
if
(
p
)
{
memcpy
(
p
+
n
,
pColData
->
pData
,
pColData
->
nData
);
}
n
+=
pColData
->
nData
;
_exit:
return
n
;
}
int32_t
tGetColData
(
uint8_t
*
p
,
SColData
*
pColData
)
{
int32_t
n
=
0
;
n
+=
tGetI16v
(
p
+
n
,
&
pColData
->
cid
);
n
+=
tGetI8
(
p
+
n
,
&
pColData
->
type
);
n
+=
tGetI8
(
p
+
n
,
&
pColData
->
smaOn
);
n
+=
tGetI32v
(
p
+
n
,
&
pColData
->
nVal
);
n
+=
tGetU8
(
p
+
n
,
&
pColData
->
flag
);
if
(
pColData
->
flag
==
HAS_NONE
||
pColData
->
flag
==
HAS_NULL
)
goto
_exit
;
if
(
pColData
->
flag
!=
HAS_VALUE
)
{
// bitmap
int32_t
size
=
BIT2_SIZE
(
pColData
->
nVal
);
pColData
->
pBitMap
=
p
+
n
;
n
+=
size
;
}
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
// offset
int32_t
size
=
sizeof
(
int32_t
)
*
pColData
->
nVal
;
pColData
->
aOffset
=
(
int32_t
*
)(
p
+
n
);
n
+=
size
;
}
n
+=
tGetI32v
(
p
+
n
,
&
pColData
->
nData
);
pColData
->
pData
=
p
+
n
;
n
+=
pColData
->
nData
;
_exit:
return
n
;
}
static
FORCE_INLINE
int32_t
tColDataCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
SColData
*
pColData1
=
(
SColData
*
)
p1
;
SColData
*
pColData2
=
(
SColData
*
)
p2
;
if
(
pColData1
->
cid
<
pColData2
->
cid
)
{
return
-
1
;
}
else
if
(
pColData1
->
cid
>
pColData2
->
cid
)
{
return
1
;
}
return
0
;
}
// SBlockData ======================================================
int32_t
tBlockDataCreate
(
SBlockData
*
pBlockData
)
{
int32_t
code
=
0
;
...
...
@@ -1182,7 +940,7 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) {
tFree
((
uint8_t
*
)
pBlockData
->
aVersion
);
tFree
((
uint8_t
*
)
pBlockData
->
aTSKEY
);
taosArrayDestroy
(
pBlockData
->
aIdx
);
taosArrayDestroyEx
(
pBlockData
->
aColData
,
deepClear
?
tColData
Clear
:
NULL
);
taosArrayDestroyEx
(
pBlockData
->
aColData
,
deepClear
?
tColData
Destroy
:
NULL
);
pBlockData
->
aUid
=
NULL
;
pBlockData
->
aVersion
=
NULL
;
pBlockData
->
aTSKEY
=
NULL
;
...
...
@@ -1251,7 +1009,7 @@ void tBlockDataClear(SBlockData *pBlockData) {
pBlockData
->
nRow
=
0
;
for
(
int32_t
iColData
=
0
;
iColData
<
taosArrayGetSize
(
pBlockData
->
aIdx
);
iColData
++
)
{
SColData
*
pColData
=
tBlockDataGetColDataByIdx
(
pBlockData
,
iColData
);
tColData
Reset
(
pColData
);
tColData
Clear
(
pColData
);
}
}
...
...
@@ -1501,7 +1259,7 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
while
(
lidx
<=
ridx
)
{
int32_t
midx
=
(
lidx
+
ridx
)
/
2
;
SColData
*
pColData
=
tBlockDataGetColDataByIdx
(
pBlockData
,
midx
);
int32_t
c
=
tColDataCmprFn
(
pColData
,
&
(
SColData
){.
cid
=
cid
}
);
int32_t
c
=
(
pColData
->
cid
==
cid
)
?
0
:
((
pColData
->
cid
>
cid
)
?
1
:
-
1
);
if
(
c
==
0
)
{
*
ppColData
=
pColData
;
...
...
@@ -1986,47 +1744,16 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
int32_t
size
=
0
;
// bitmap
if
(
pColData
->
flag
!=
HAS_VALUE
)
{
uint8_t
*
pBitMap
=
pColData
->
pBitMap
;
int32_t
szBitMap
=
BIT2_SIZE
(
pColData
->
nVal
);
// BIT2 to BIT1
if
(
pColData
->
flag
!=
(
HAS_VALUE
|
HAS_NULL
|
HAS_NONE
))
{
int32_t
szBitMap
;
if
(
pColData
->
flag
==
(
HAS_VALUE
|
HAS_NULL
|
HAS_NONE
))
{
szBitMap
=
BIT2_SIZE
(
pColData
->
nVal
);
}
else
{
szBitMap
=
BIT1_SIZE
(
pColData
->
nVal
);
pBitMap
=
taosMemoryCalloc
(
1
,
szBitMap
);
if
(
pBitMap
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
for
(
int32_t
iVal
=
0
;
iVal
<
pColData
->
nVal
;
iVal
++
)
{
uint8_t
v
=
GET_BIT2
(
pColData
->
pBitMap
,
iVal
);
switch
(
pColData
->
flag
)
{
case
(
HAS_NULL
|
HAS_NONE
):
SET_BIT1
(
pBitMap
,
iVal
,
v
);
break
;
case
(
HAS_VALUE
|
HAS_NONE
):
if
(
v
)
{
SET_BIT1
(
pBitMap
,
iVal
,
1
);
}
else
{
SET_BIT1
(
pBitMap
,
iVal
,
0
);
}
break
;
case
(
HAS_VALUE
|
HAS_NULL
):
SET_BIT1
(
pBitMap
,
iVal
,
v
-
1
);
break
;
default:
ASSERT
(
0
);
}
}
}
code
=
tsdbCmprData
(
p
BitMap
,
szBitMap
,
TSDB_DATA_TYPE_TINYINT
,
cmprAlg
,
ppOut
,
nOut
+
size
,
&
pBlockCol
->
szBitmap
,
ppBuf
);
code
=
tsdbCmprData
(
p
ColData
->
pBitMap
,
szBitMap
,
TSDB_DATA_TYPE_TINYINT
,
cmprAlg
,
ppOut
,
nOut
+
size
,
&
pBlockCol
->
szBitmap
,
ppBuf
);
if
(
code
)
goto
_exit
;
if
(
pColData
->
flag
!=
(
HAS_VALUE
|
HAS_NULL
|
HAS_NONE
))
{
taosMemoryFree
(
pBitMap
);
}
}
size
+=
pBlockCol
->
szBitmap
;
...
...
@@ -2064,46 +1791,15 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
uint8_t
*
p
=
pIn
;
// bitmap
if
(
pBlockCol
->
szBitmap
)
{
if
(
pBlockCol
->
flag
!=
(
HAS_VALUE
|
HAS_NULL
|
HAS_NONE
))
{
uint8_t
*
pBitMap
=
NULL
;
code
=
tsdbDecmprData
(
p
,
pBlockCol
->
szBitmap
,
TSDB_DATA_TYPE_TINYINT
,
cmprAlg
,
&
pBitMap
,
BIT1_SIZE
(
pColData
->
nVal
),
ppBuf
);
if
(
code
)
goto
_exit
;
code
=
tRealloc
(
&
pColData
->
pBitMap
,
BIT2_SIZE
(
pColData
->
nVal
));
if
(
code
)
{
tFree
(
pBitMap
);
goto
_exit
;
}
// BIT1 to BIT2
for
(
int32_t
iVal
=
0
;
iVal
<
nVal
;
iVal
++
)
{
uint8_t
v
=
GET_BIT1
(
pBitMap
,
iVal
);
switch
(
pBlockCol
->
flag
)
{
case
(
HAS_NULL
|
HAS_NONE
):
SET_BIT2
(
pColData
->
pBitMap
,
iVal
,
v
);
break
;
case
(
HAS_VALUE
|
HAS_NONE
):
if
(
v
)
{
SET_BIT2
(
pColData
->
pBitMap
,
iVal
,
2
);
}
else
{
SET_BIT2
(
pColData
->
pBitMap
,
iVal
,
0
);
}
break
;
case
(
HAS_VALUE
|
HAS_NULL
):
SET_BIT2
(
pColData
->
pBitMap
,
iVal
,
v
+
1
);
break
;
default:
ASSERT
(
0
);
}
}
tFree
(
pBitMap
);
int32_t
szBitMap
;
if
(
pColData
->
flag
==
(
HAS_VALUE
|
HAS_NULL
|
HAS_NONE
))
{
szBitMap
=
BIT2_SIZE
(
pColData
->
nVal
);
}
else
{
code
=
tsdbDecmprData
(
p
,
pBlockCol
->
szBitmap
,
TSDB_DATA_TYPE_TINYINT
,
cmprAlg
,
&
pColData
->
pBitMap
,
BIT2_SIZE
(
pColData
->
nVal
),
ppBuf
);
if
(
code
)
goto
_exit
;
szBitMap
=
BIT1_SIZE
(
pColData
->
nVal
);
}
code
=
tsdbDecmprData
(
p
,
pBlockCol
->
szBitmap
,
TSDB_DATA_TYPE_TINYINT
,
cmprAlg
,
&
pColData
->
pBitMap
,
szBitMap
,
ppBuf
);
if
(
code
)
goto
_exit
;
}
p
+=
pBlockCol
->
szBitmap
;
...
...
source/libs/executor/src/dataDeleter.c
浏览文件 @
87b59d21
...
...
@@ -79,25 +79,33 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pEntry
->
dataLen
=
sizeof
(
SDeleterRes
);
ASSERT
(
1
==
pEntry
->
numOfRows
);
ASSERT
(
1
==
pEntry
->
numOfCols
);
ASSERT
(
3
==
pEntry
->
numOfCols
);
pBuf
->
useSize
=
sizeof
(
SDataCacheEntry
);
SColumnInfoData
*
pColRes
=
(
SColumnInfoData
*
)
taosArrayGet
(
pInput
->
pData
->
pDataBlock
,
0
);
SColumnInfoData
*
pColSKey
=
(
SColumnInfoData
*
)
taosArrayGet
(
pInput
->
pData
->
pDataBlock
,
1
);
SColumnInfoData
*
pColEKey
=
(
SColumnInfoData
*
)
taosArrayGet
(
pInput
->
pData
->
pDataBlock
,
2
);
SDeleterRes
*
pRes
=
(
SDeleterRes
*
)
pEntry
->
data
;
pRes
->
suid
=
pHandle
->
pParam
->
suid
;
pRes
->
uidList
=
pHandle
->
pParam
->
pUidList
;
pRes
->
skey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
skey
;
pRes
->
ekey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
ekey
;
strcpy
(
pRes
->
tableName
,
pHandle
->
pDeleter
->
tableFName
);
strcpy
(
pRes
->
tsColName
,
pHandle
->
pDeleter
->
tsColName
);
pRes
->
affectedRows
=
*
(
int64_t
*
)
pColRes
->
pData
;
if
(
pRes
->
affectedRows
)
{
pRes
->
skey
=
*
(
int64_t
*
)
pColSKey
->
pData
;
pRes
->
ekey
=
*
(
int64_t
*
)
pColEKey
->
pData
;
ASSERT
(
pRes
->
skey
<=
pRes
->
ekey
);
}
else
{
pRes
->
skey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
skey
;
pRes
->
ekey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
ekey
;
}
pBuf
->
useSize
+=
pEntry
->
dataLen
;
atomic_add_fetch_64
(
&
pHandle
->
cachedSize
,
pEntry
->
dataLen
);
atomic_add_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
atomic_add_fetch_64
(
&
pHandle
->
cachedSize
,
pEntry
->
dataLen
);
atomic_add_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
}
static
bool
allocBuf
(
SDataDeleterHandle
*
pDeleter
,
const
SInputData
*
pInput
,
SDataDeleterBuf
*
pBuf
)
{
...
...
@@ -172,7 +180,8 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pDeleter
->
nextOutput
.
pData
;
*
pLen
=
pEntry
->
dataLen
;
*
pQueryEnd
=
pDeleter
->
queryEnd
;
qDebug
(
"got data len %"
PRId64
", row num %d in sink"
,
*
pLen
,
((
SDataCacheEntry
*
)(
pDeleter
->
nextOutput
.
pData
))
->
numOfRows
);
qDebug
(
"got data len %"
PRId64
", row num %d in sink"
,
*
pLen
,
((
SDataCacheEntry
*
)(
pDeleter
->
nextOutput
.
pData
))
->
numOfRows
);
}
static
int32_t
getDataBlock
(
SDataSinkHandle
*
pHandle
,
SOutputData
*
pOutput
)
{
...
...
@@ -186,14 +195,14 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
return
TSDB_CODE_SUCCESS
;
}
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)(
pDeleter
->
nextOutput
.
pData
);
memcpy
(
pOutput
->
pData
,
pEntry
->
data
,
pEntry
->
dataLen
);
memcpy
(
pOutput
->
pData
,
pEntry
->
data
,
pEntry
->
dataLen
);
pDeleter
->
pParam
->
pUidList
=
NULL
;
pOutput
->
numOfRows
=
pEntry
->
numOfRows
;
pOutput
->
numOfCols
=
pEntry
->
numOfCols
;
pOutput
->
compressed
=
pEntry
->
compressed
;
atomic_sub_fetch_64
(
&
pDeleter
->
cachedSize
,
pEntry
->
dataLen
);
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
atomic_sub_fetch_64
(
&
pDeleter
->
cachedSize
,
pEntry
->
dataLen
);
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
taosMemoryFreeClear
(
pDeleter
->
nextOutput
.
pData
);
// todo persistent
pOutput
->
bufStatus
=
updateStatus
(
pDeleter
);
...
...
@@ -202,7 +211,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
useconds
=
pDeleter
->
useconds
;
pOutput
->
precision
=
pDeleter
->
pSchema
->
precision
;
taosThreadMutexUnlock
(
&
pDeleter
->
mutex
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -211,7 +220,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pDeleter
->
cachedSize
);
taosMemoryFreeClear
(
pDeleter
->
nextOutput
.
pData
);
taosArrayDestroy
(
pDeleter
->
pParam
->
pUidList
);
taosMemoryFree
(
pDeleter
->
pParam
);
taosMemoryFree
(
pDeleter
->
pParam
);
while
(
!
taosQueueEmpty
(
pDeleter
->
pDataBlocks
))
{
SDataDeleterBuf
*
pBuf
=
NULL
;
taosReadQitem
(
pDeleter
->
pDataBlocks
,
(
void
**
)
&
pBuf
);
...
...
@@ -230,14 +239,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
createDataDeleter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
)
{
int32_t
createDataDeleter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
)
{
SDataDeleterHandle
*
deleter
=
taosMemoryCalloc
(
1
,
sizeof
(
SDataDeleterHandle
));
if
(
NULL
==
deleter
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
SDataDeleterNode
*
pDeleterNode
=
(
SDataDeleterNode
*
)
pDataSink
;
SDataDeleterNode
*
pDeleterNode
=
(
SDataDeleterNode
*
)
pDataSink
;
deleter
->
sink
.
fPut
=
putDataBlock
;
deleter
->
sink
.
fEndPut
=
endPut
;
deleter
->
sink
.
fGetLen
=
getDataLength
;
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
87b59d21
...
...
@@ -399,6 +399,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD
(
modifyType
);
COPY_SCALAR_FIELD
(
msgType
);
CLONE_NODE_FIELD
(
pAffectedRows
);
CLONE_NODE_FIELD
(
pStartTs
);
CLONE_NODE_FIELD
(
pEndTs
);
COPY_SCALAR_FIELD
(
tableId
);
COPY_SCALAR_FIELD
(
stableId
);
COPY_SCALAR_FIELD
(
tableType
);
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
87b59d21
...
...
@@ -2431,6 +2431,8 @@ static const char* jkDeletePhysiPlanTsColName = "TsColName";
static
const
char
*
jkDeletePhysiPlanDeleteTimeRangeStartKey
=
"DeleteTimeRangeStartKey"
;
static
const
char
*
jkDeletePhysiPlanDeleteTimeRangeEndKey
=
"DeleteTimeRangeEndKey"
;
static
const
char
*
jkDeletePhysiPlanAffectedRows
=
"AffectedRows"
;
static
const
char
*
jkDeletePhysiPlanStartTs
=
"StartTs"
;
static
const
char
*
jkDeletePhysiPlanEndTs
=
"EndTs"
;
static
int32_t
physiDeleteNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SDataDeleterNode
*
pNode
=
(
const
SDataDeleterNode
*
)
pObj
;
...
...
@@ -2457,6 +2459,12 @@ static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkDeletePhysiPlanAffectedRows
,
nodeToJson
,
pNode
->
pAffectedRows
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkDeletePhysiPlanStartTs
,
nodeToJson
,
pNode
->
pStartTs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkDeletePhysiPlanEndTs
,
nodeToJson
,
pNode
->
pEndTs
);
}
return
code
;
}
...
...
@@ -2486,6 +2494,12 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkDeletePhysiPlanAffectedRows
,
&
pNode
->
pAffectedRows
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkDeletePhysiPlanStartTs
,
&
pNode
->
pStartTs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkDeletePhysiPlanEndTs
,
&
pNode
->
pEndTs
);
}
return
code
;
}
...
...
source/libs/nodes/src/nodesMsgFuncs.c
浏览文件 @
87b59d21
...
...
@@ -2665,7 +2665,9 @@ enum {
PHY_DELETER_CODE_TABLE_FNAME
,
PHY_DELETER_CODE_TS_COL_NAME
,
PHY_DELETER_CODE_DELETE_TIME_RANGE
,
PHY_DELETER_CODE_AFFECTED_ROWS
PHY_DELETER_CODE_AFFECTED_ROWS
,
PHY_DELETER_CODE_START_TS
,
PHY_DELETER_CODE_END_TS
};
static
int32_t
physiDeleteNodeToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
...
...
@@ -2690,6 +2692,12 @@ static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeObj
(
pEncoder
,
PHY_DELETER_CODE_AFFECTED_ROWS
,
nodeToMsg
,
pNode
->
pAffectedRows
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeObj
(
pEncoder
,
PHY_DELETER_CODE_START_TS
,
nodeToMsg
,
pNode
->
pStartTs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeObj
(
pEncoder
,
PHY_DELETER_CODE_END_TS
,
nodeToMsg
,
pNode
->
pEndTs
);
}
return
code
;
}
...
...
@@ -2722,6 +2730,12 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) {
case
PHY_DELETER_CODE_AFFECTED_ROWS
:
code
=
msgToNodeFromTlv
(
pTlv
,
(
void
**
)
&
pNode
->
pAffectedRows
);
break
;
case
PHY_DELETER_CODE_START_TS
:
code
=
msgToNodeFromTlv
(
pTlv
,
(
void
**
)
&
pNode
->
pStartTs
);
break
;
case
PHY_DELETER_CODE_END_TS
:
code
=
msgToNodeFromTlv
(
pTlv
,
(
void
**
)
&
pNode
->
pEndTs
);
break
;
default:
break
;
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
87b59d21
...
...
@@ -727,6 +727,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode
(
pStmt
->
pFromTable
);
nodesDestroyNode
(
pStmt
->
pWhere
);
nodesDestroyNode
(
pStmt
->
pCountFunc
);
nodesDestroyNode
(
pStmt
->
pFirstFunc
);
nodesDestroyNode
(
pStmt
->
pLastFunc
);
nodesDestroyNode
(
pStmt
->
pTagCond
);
break
;
}
...
...
@@ -791,6 +793,8 @@ void nodesDestroyNode(SNode* pNode) {
destroyVgDataBlockArray
(
pLogicNode
->
pDataBlocks
);
// pVgDataBlocks is weak reference
nodesDestroyNode
(
pLogicNode
->
pAffectedRows
);
nodesDestroyNode
(
pLogicNode
->
pStartTs
);
nodesDestroyNode
(
pLogicNode
->
pEndTs
);
taosMemoryFreeClear
(
pLogicNode
->
pVgroupList
);
nodesDestroyList
(
pLogicNode
->
pInsertCols
);
break
;
...
...
@@ -997,6 +1001,8 @@ void nodesDestroyNode(SNode* pNode) {
SDataDeleterNode
*
pSink
=
(
SDataDeleterNode
*
)
pNode
;
destroyDataSinkNode
((
SDataSinkNode
*
)
pSink
);
nodesDestroyNode
(
pSink
->
pAffectedRows
);
nodesDestroyNode
(
pSink
->
pStartTs
);
nodesDestroyNode
(
pSink
->
pEndTs
);
break
;
}
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
{
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
87b59d21
...
...
@@ -1787,10 +1787,10 @@ SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDb
return
(
SNode
*
)
pStmt
;
}
SNode
*
create
CountFuncForDelete
(
SAstCreateContext
*
pCxt
)
{
SNode
*
create
FuncForDelete
(
SAstCreateContext
*
pCxt
,
const
char
*
pFuncName
)
{
SFunctionNode
*
pFunc
=
(
SFunctionNode
*
)
nodesMakeNode
(
QUERY_NODE_FUNCTION
);
CHECK_OUT_OF_MEM
(
pFunc
);
strcpy
(
pFunc
->
functionName
,
"count"
);
strcpy
(
pFunc
->
functionName
,
pFuncName
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListMakeStrictAppend
(
&
pFunc
->
pParameterList
,
createPrimaryKeyCol
(
pCxt
,
NULL
)))
{
nodesDestroyNode
((
SNode
*
)
pFunc
);
CHECK_OUT_OF_MEM
(
NULL
);
...
...
@@ -1804,8 +1804,10 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
CHECK_OUT_OF_MEM
(
pStmt
);
pStmt
->
pFromTable
=
pTable
;
pStmt
->
pWhere
=
pWhere
;
pStmt
->
pCountFunc
=
createCountFuncForDelete
(
pCxt
);
if
(
NULL
==
pStmt
->
pCountFunc
)
{
pStmt
->
pCountFunc
=
createFuncForDelete
(
pCxt
,
"count"
);
pStmt
->
pFirstFunc
=
createFuncForDelete
(
pCxt
,
"first"
);
pStmt
->
pLastFunc
=
createFuncForDelete
(
pCxt
,
"last"
);
if
(
NULL
==
pStmt
->
pCountFunc
||
NULL
==
pStmt
->
pFirstFunc
||
NULL
==
pStmt
->
pLastFunc
)
{
nodesDestroyNode
((
SNode
*
)
pStmt
);
CHECK_OUT_OF_MEM
(
NULL
);
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
87b59d21
...
...
@@ -3347,10 +3347,16 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
translateDeleteWhere
(
pCxt
,
pDelete
);
}
pCxt
->
currClause
=
SQL_CLAUSE_SELECT
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pCxt
->
currClause
=
SQL_CLAUSE_SELECT
;
code
=
translateExpr
(
pCxt
,
&
pDelete
->
pCountFunc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
translateExpr
(
pCxt
,
&
pDelete
->
pFirstFunc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
translateExpr
(
pCxt
,
&
pDelete
->
pLastFunc
);
}
return
code
;
}
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
87b59d21
...
...
@@ -1372,9 +1372,21 @@ static int32_t createDeleteAggLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pD
}
int32_t
code
=
nodesListMakeStrictAppend
(
&
pAgg
->
pAggFuncs
,
nodesCloneNode
(
pDelete
->
pCountFunc
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListStrictAppend
(
pAgg
->
pAggFuncs
,
nodesCloneNode
(
pDelete
->
pFirstFunc
));
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListStrictAppend
(
pAgg
->
pAggFuncs
,
nodesCloneNode
(
pDelete
->
pLastFunc
));
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteExpr
(
pAgg
->
pAggFuncs
,
&
pDelete
->
pCountFunc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteExpr
(
pAgg
->
pAggFuncs
,
&
pDelete
->
pFirstFunc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteExpr
(
pAgg
->
pAggFuncs
,
&
pDelete
->
pLastFunc
);
}
// set the output
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExprs
(
pAgg
->
pAggFuncs
,
&
pAgg
->
node
.
pTargets
);
...
...
@@ -1405,7 +1417,9 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet
strcpy
(
pModify
->
tsColName
,
pRealTable
->
pMeta
->
schema
->
name
);
pModify
->
deleteTimeRange
=
pDelete
->
timeRange
;
pModify
->
pAffectedRows
=
nodesCloneNode
(
pDelete
->
pCountFunc
);
if
(
NULL
==
pModify
->
pAffectedRows
)
{
pModify
->
pStartTs
=
nodesCloneNode
(
pDelete
->
pFirstFunc
);
pModify
->
pEndTs
=
nodesCloneNode
(
pDelete
->
pLastFunc
);
if
(
NULL
==
pModify
->
pAffectedRows
||
NULL
==
pModify
->
pStartTs
||
NULL
==
pModify
->
pEndTs
)
{
nodesDestroyNode
((
SNode
*
)
pModify
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
87b59d21
...
...
@@ -1323,9 +1323,9 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
static
int32_t
createPartitionPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SPartitionLogicNode
*
pPartLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SPartitionPhysiNode
*
pPart
=
(
SPartitionPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pPartLogicNode
,
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
:
QUERY_NODE_PHYSICAL_PLAN_PARTITION
);
SPartitionPhysiNode
*
pPart
=
(
SPartitionPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pPartLogicNode
,
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
:
QUERY_NODE_PHYSICAL_PLAN_PARTITION
);
if
(
NULL
==
pPart
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -1670,6 +1670,12 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode*
int32_t
code
=
setNodeSlotId
(
pCxt
,
pRoot
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pModify
->
pAffectedRows
,
&
pDeleter
->
pAffectedRows
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setNodeSlotId
(
pCxt
,
pRoot
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pModify
->
pStartTs
,
&
pDeleter
->
pStartTs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setNodeSlotId
(
pCxt
,
pRoot
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pModify
->
pEndTs
,
&
pDeleter
->
pEndTs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pDeleter
->
sink
.
pInputDataBlockDesc
=
(
SDataBlockDescNode
*
)
nodesCloneNode
((
SNode
*
)
pRoot
->
pOutputDataBlockDesc
);
if
(
NULL
==
pDeleter
->
sink
.
pInputDataBlockDesc
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录