Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a038c466
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a038c466
编写于
2月 15, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor tq meta store
上级
4f81a338
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
386 addition
and
222 deletion
+386
-222
include/common/common.h
include/common/common.h
+64
-65
include/common/taosdef.h
include/common/taosdef.h
+7
-7
include/util/tcoding.h
include/util/tcoding.h
+25
-27
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+16
-7
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+8
-8
source/dnode/vnode/src/inc/tqInt.h
source/dnode/vnode/src/inc/tqInt.h
+11
-8
source/dnode/vnode/src/inc/tqMetaStore.h
source/dnode/vnode/src/inc/tqMetaStore.h
+2
-2
source/dnode/vnode/src/inc/tqOffset.h
source/dnode/vnode/src/inc/tqOffset.h
+40
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+119
-46
source/dnode/vnode/src/tq/tqMetaStore.c
source/dnode/vnode/src/tq/tqMetaStore.c
+15
-17
source/dnode/vnode/src/tq/tqOffset.c
source/dnode/vnode/src/tq/tqOffset.c
+42
-0
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+35
-34
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+1
-0
未找到文件。
include/common/common.h
浏览文件 @
a038c466
...
...
@@ -17,35 +17,35 @@
#define TDENGINE_COMMON_H
#include "taosdef.h"
#include "tmsg.h"
#include "tarray.h"
#include "tmsg.h"
#include "tvariant.h"
//typedef struct STimeWindow {
// TSKEY skey;
// TSKEY ekey;
//} STimeWindow;
//typedef struct {
// int32_t dataLen;
// char name[TSDB_TABLE_FNAME_LEN];
// char *data;
//} STagData;
//typedef struct SSchema {
// uint8_t type;
// char name[TSDB_COL_NAME_LEN];
// int16_t colId;
// int16_t bytes;
//} SSchema;
#define TMQ_REQ_TYPE_COMMIT_ONLY
0
#define TMQ_REQ_TYPE_CONSUME_ONLY
1
//
typedef struct STimeWindow {
//
TSKEY skey;
//
TSKEY ekey;
//
} STimeWindow;
//
typedef struct {
//
int32_t dataLen;
//
char name[TSDB_TABLE_FNAME_LEN];
//
char *data;
//
} STagData;
//
typedef struct SSchema {
//
uint8_t type;
//
char name[TSDB_COL_NAME_LEN];
//
int16_t colId;
//
int16_t bytes;
//
} SSchema;
#define TMQ_REQ_TYPE_COMMIT_ONLY 0
#define TMQ_REQ_TYPE_CONSUME_ONLY 1
#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2
typedef
struct
{
uint32_t
numOfTables
;
SArray
*
pGroupList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
SArray
*
pGroupList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
}
STableGroupInfo
;
typedef
struct
SColumnDataAgg
{
...
...
@@ -67,25 +67,25 @@ typedef struct SDataBlockInfo {
typedef
struct
SConstantItem
{
SColumnInfo
info
;
int32_t
startRow
;
// run-length-encoding to save the space for multiple rows
int32_t
startRow
;
// run-length-encoding to save the space for multiple rows
int32_t
endRow
;
SVariant
value
;
}
SConstantItem
;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef
struct
SSDataBlock
{
SColumnDataAgg
*
pBlockAgg
;
SArray
*
pDataBlock
;
// SArray<SColumnInfoData>
SArray
*
pConstantList
;
// SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo
info
;
SColumnDataAgg
*
pBlockAgg
;
SArray
*
pDataBlock
;
// SArray<SColumnInfoData>
SArray
*
pConstantList
;
// SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo
info
;
}
SSDataBlock
;
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef
struct
SColumnInfoData
{
SColumnInfo
info
;
// TODO filter info needs to be removed
char
*
nullbitmap
;
//
char
*
pData
;
// the corresponding block data in memory
SColumnInfo
info
;
// TODO filter info needs to be removed
char
*
nullbitmap
;
//
char
*
pData
;
// the corresponding block data in memory
}
SColumnInfoData
;
static
FORCE_INLINE
int32_t
tEncodeDataBlock
(
void
**
buf
,
const
SSDataBlock
*
pBlock
)
{
...
...
@@ -110,7 +110,7 @@ static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlo
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeDataBlock
(
void
*
buf
,
SSDataBlock
*
pBlock
)
{
static
FORCE_INLINE
void
*
tDecodeDataBlock
(
const
void
*
buf
,
SSDataBlock
*
pBlock
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pBlock
->
info
.
uid
);
...
...
@@ -127,7 +127,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
buf
=
taosDecodeBinary
(
buf
,
(
void
**
)
&
data
.
pData
,
colSz
);
taosArrayPush
(
pBlock
->
pDataBlock
,
&
data
);
}
return
buf
;
return
(
void
*
)
buf
;
}
static
FORCE_INLINE
int32_t
tEncodeSMqConsumeRsp
(
void
**
buf
,
const
SMqConsumeRsp
*
pRsp
)
{
...
...
@@ -146,7 +146,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
}
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pBlock
=
(
SSDataBlock
*
)
taosArrayGet
(
pRsp
->
pBlockData
,
i
);
SSDataBlock
*
pBlock
=
(
SSDataBlock
*
)
taosArrayGet
(
pRsp
->
pBlockData
,
i
);
tlen
+=
tEncodeDataBlock
(
buf
,
pBlock
);
}
return
tlen
;
...
...
@@ -179,19 +179,18 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
return
;
}
//int32_t numOfOutput = pBlock->info.numOfCols;
//
int32_t numOfOutput = pBlock->info.numOfCols;
int32_t
sz
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
tfree
(
pColInfoData
->
pData
);
}
taosArrayDestroy
(
pBlock
->
pDataBlock
);
tfree
(
pBlock
->
pBlockAgg
);
//tfree(pBlock);
//
tfree(pBlock);
}
static
FORCE_INLINE
void
tDeleteSMqConsumeRsp
(
SMqConsumeRsp
*
pRsp
)
{
if
(
pRsp
->
schemas
)
{
if
(
pRsp
->
schemas
->
nCols
)
{
...
...
@@ -199,53 +198,53 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
}
free
(
pRsp
->
schemas
);
}
taosArrayDestroyEx
(
pRsp
->
pBlockData
,
(
void
(
*
)(
void
*
))
tDeleteSSDataBlock
);
pRsp
->
pBlockData
=
NULL
;
//for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
//
SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
//
tDeleteSSDataBlock(pDataBlock);
taosArrayDestroyEx
(
pRsp
->
pBlockData
,
(
void
(
*
)(
void
*
))
tDeleteSSDataBlock
);
pRsp
->
pBlockData
=
NULL
;
//
for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
//
SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
//
tDeleteSSDataBlock(pDataBlock);
//}
}
//======================================================================================================================
// the following structure shared by parser and executor
typedef
struct
SColumn
{
uint64_t
uid
;
char
name
[
TSDB_COL_NAME_LEN
];
int8_t
flag
;
// column type: normal column, tag, or user-input column (integer/float/string)
SColumnInfo
info
;
uint64_t
uid
;
char
name
[
TSDB_COL_NAME_LEN
];
int8_t
flag
;
// column type: normal column, tag, or user-input column (integer/float/string)
SColumnInfo
info
;
}
SColumn
;
typedef
struct
SLimit
{
int64_t
limit
;
int64_t
offset
;
int64_t
limit
;
int64_t
offset
;
}
SLimit
;
typedef
struct
SOrder
{
uint32_t
order
;
SColumn
col
;
uint32_t
order
;
SColumn
col
;
}
SOrder
;
typedef
struct
SGroupbyExpr
{
SArray
*
columnInfo
;
// SArray<SColIndex>, group by columns information
bool
groupbyTag
;
// group by tag or column
SArray
*
columnInfo
;
// SArray<SColIndex>, group by columns information
bool
groupbyTag
;
// group by tag or column
}
SGroupbyExpr
;
// the structure for sql function in select clause
typedef
struct
SSqlExpr
{
char
token
[
TSDB_COL_NAME_LEN
];
// original token
SSchema
resSchema
;
int32_t
numOfCols
;
SColumn
*
pColumns
;
// data columns that are required by query
int32_t
interBytes
;
// inter result buffer size
int16_t
numOfParams
;
// argument value of each function
SVariant
param
[
3
];
// parameters are not more than 3
char
token
[
TSDB_COL_NAME_LEN
];
// original token
SSchema
resSchema
;
int32_t
numOfCols
;
SColumn
*
pColumns
;
// data columns that are required by query
int32_t
interBytes
;
// inter result buffer size
int16_t
numOfParams
;
// argument value of each function
SVariant
param
[
3
];
// parameters are not more than 3
}
SSqlExpr
;
typedef
struct
SExprInfo
{
struct
SSqlExpr
base
;
struct
tExprNode
*
pExpr
;
struct
SSqlExpr
base
;
struct
tExprNode
*
pExpr
;
}
SExprInfo
;
typedef
struct
SStateWindow
{
...
...
@@ -253,11 +252,11 @@ typedef struct SStateWindow {
}
SStateWindow
;
typedef
struct
SSessionWindow
{
int64_t
gap
;
// gap between two session window(in microseconds)
int64_t
gap
;
// gap between two session window(in microseconds)
SColumn
col
;
}
SSessionWindow
;
#define QUERY_ASC_FORWARD_STEP
1
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
...
...
include/common/taosdef.h
浏览文件 @
a038c466
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include "taos.h"
#include "tdef.h"
typedef
u
int64_t
tb_uid_t
;
typedef
int64_t
tb_uid_t
;
#define TSWINDOW_INITIALIZER ((STimeWindow){INT64_MIN, INT64_MAX})
#define TSWINDOW_DESC_INITIALIZER ((STimeWindow){INT64_MAX, INT64_MIN})
...
...
@@ -38,12 +38,12 @@ typedef enum {
}
EQType
;
typedef
enum
{
TSDB_SUPER_TABLE
=
1
,
// super table
TSDB_CHILD_TABLE
=
2
,
// table created from super table
TSDB_NORMAL_TABLE
=
3
,
// ordinary table
TSDB_STREAM_TABLE
=
4
,
// table created from stream computing
TSDB_TEMP_TABLE
=
5
,
// temp table created by nest query
TSDB_TABLE_MAX
=
6
TSDB_SUPER_TABLE
=
1
,
// super table
TSDB_CHILD_TABLE
=
2
,
// table created from super table
TSDB_NORMAL_TABLE
=
3
,
// ordinary table
TSDB_STREAM_TABLE
=
4
,
// table created from stream computing
TSDB_TEMP_TABLE
=
5
,
// temp table created by nest query
TSDB_TABLE_MAX
=
6
}
ETableType
;
typedef
enum
{
...
...
include/util/tcoding.h
浏览文件 @
a038c466
...
...
@@ -37,7 +37,7 @@ static FORCE_INLINE int taosEncodeFixedU8(void **buf, uint8_t value) {
return
(
int
)
sizeof
(
value
);
}
static
FORCE_INLINE
void
*
taosDecodeFixedU8
(
void
*
buf
,
uint8_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeFixedU8
(
const
void
*
buf
,
uint8_t
*
value
)
{
*
value
=
((
uint8_t
*
)
buf
)[
0
];
return
POINTER_SHIFT
(
buf
,
sizeof
(
*
value
));
}
...
...
@@ -51,7 +51,7 @@ static FORCE_INLINE int taosEncodeFixedI8(void **buf, int8_t value) {
return
(
int
)
sizeof
(
value
);
}
static
FORCE_INLINE
void
*
taosDecodeFixedI8
(
void
*
buf
,
int8_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeFixedI8
(
const
void
*
buf
,
int8_t
*
value
)
{
*
value
=
((
int8_t
*
)
buf
)[
0
];
return
POINTER_SHIFT
(
buf
,
sizeof
(
*
value
));
}
...
...
@@ -71,7 +71,7 @@ static FORCE_INLINE int taosEncodeFixedU16(void **buf, uint16_t value) {
return
(
int
)
sizeof
(
value
);
}
static
FORCE_INLINE
void
*
taosDecodeFixedU16
(
void
*
buf
,
uint16_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeFixedU16
(
const
void
*
buf
,
uint16_t
*
value
)
{
if
(
IS_LITTLE_ENDIAN
())
{
memcpy
(
value
,
buf
,
sizeof
(
*
value
));
}
else
{
...
...
@@ -87,9 +87,9 @@ static FORCE_INLINE int taosEncodeFixedI16(void **buf, int16_t value) {
return
taosEncodeFixedU16
(
buf
,
ZIGZAGE
(
int16_t
,
value
));
}
static
FORCE_INLINE
void
*
taosDecodeFixedI16
(
void
*
buf
,
int16_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeFixedI16
(
const
void
*
buf
,
int16_t
*
value
)
{
uint16_t
tvalue
=
0
;
void
*
ret
=
taosDecodeFixedU16
(
buf
,
&
tvalue
);
void
*
ret
=
taosDecodeFixedU16
(
buf
,
&
tvalue
);
*
value
=
ZIGZAGD
(
int16_t
,
tvalue
);
return
ret
;
}
...
...
@@ -111,7 +111,7 @@ static FORCE_INLINE int taosEncodeFixedU32(void **buf, uint32_t value) {
return
(
int
)
sizeof
(
value
);
}
static
FORCE_INLINE
void
*
taosDecodeFixedU32
(
void
*
buf
,
uint32_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeFixedU32
(
const
void
*
buf
,
uint32_t
*
value
)
{
if
(
IS_LITTLE_ENDIAN
())
{
memcpy
(
value
,
buf
,
sizeof
(
*
value
));
}
else
{
...
...
@@ -129,9 +129,9 @@ static FORCE_INLINE int taosEncodeFixedI32(void **buf, int32_t value) {
return
taosEncodeFixedU32
(
buf
,
ZIGZAGE
(
int32_t
,
value
));
}
static
FORCE_INLINE
void
*
taosDecodeFixedI32
(
void
*
buf
,
int32_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeFixedI32
(
const
void
*
buf
,
int32_t
*
value
)
{
uint32_t
tvalue
=
0
;
void
*
ret
=
taosDecodeFixedU32
(
buf
,
&
tvalue
);
void
*
ret
=
taosDecodeFixedU32
(
buf
,
&
tvalue
);
*
value
=
ZIGZAGD
(
int32_t
,
tvalue
);
return
ret
;
}
...
...
@@ -158,7 +158,7 @@ static FORCE_INLINE int taosEncodeFixedU64(void **buf, uint64_t value) {
return
(
int
)
sizeof
(
value
);
}
static
FORCE_INLINE
void
*
taosDecodeFixedU64
(
void
*
buf
,
uint64_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeFixedU64
(
const
void
*
buf
,
uint64_t
*
value
)
{
if
(
IS_LITTLE_ENDIAN
())
{
memcpy
(
value
,
buf
,
sizeof
(
*
value
));
}
else
{
...
...
@@ -180,9 +180,9 @@ static FORCE_INLINE int taosEncodeFixedI64(void **buf, int64_t value) {
return
taosEncodeFixedU64
(
buf
,
ZIGZAGE
(
int64_t
,
value
));
}
static
FORCE_INLINE
void
*
taosDecodeFixedI64
(
void
*
buf
,
int64_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeFixedI64
(
const
void
*
buf
,
int64_t
*
value
)
{
uint64_t
tvalue
=
0
;
void
*
ret
=
taosDecodeFixedU64
(
buf
,
&
tvalue
);
void
*
ret
=
taosDecodeFixedU64
(
buf
,
&
tvalue
);
*
value
=
ZIGZAGD
(
int64_t
,
tvalue
);
return
ret
;
}
...
...
@@ -205,7 +205,7 @@ static FORCE_INLINE int taosEncodeVariantU16(void **buf, uint16_t value) {
return
i
+
1
;
}
static
FORCE_INLINE
void
*
taosDecodeVariantU16
(
void
*
buf
,
uint16_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeVariantU16
(
const
void
*
buf
,
uint16_t
*
value
)
{
int
i
=
0
;
uint16_t
tval
=
0
;
*
value
=
0
;
...
...
@@ -228,9 +228,9 @@ static FORCE_INLINE int taosEncodeVariantI16(void **buf, int16_t value) {
return
taosEncodeVariantU16
(
buf
,
ZIGZAGE
(
int16_t
,
value
));
}
static
FORCE_INLINE
void
*
taosDecodeVariantI16
(
void
*
buf
,
int16_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeVariantI16
(
const
void
*
buf
,
int16_t
*
value
)
{
uint16_t
tvalue
=
0
;
void
*
ret
=
taosDecodeVariantU16
(
buf
,
&
tvalue
);
void
*
ret
=
taosDecodeVariantU16
(
buf
,
&
tvalue
);
*
value
=
ZIGZAGD
(
int16_t
,
tvalue
);
return
ret
;
}
...
...
@@ -253,7 +253,7 @@ static FORCE_INLINE int taosEncodeVariantU32(void **buf, uint32_t value) {
return
i
+
1
;
}
static
FORCE_INLINE
void
*
taosDecodeVariantU32
(
void
*
buf
,
uint32_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeVariantU32
(
const
void
*
buf
,
uint32_t
*
value
)
{
int
i
=
0
;
uint32_t
tval
=
0
;
*
value
=
0
;
...
...
@@ -276,9 +276,9 @@ static FORCE_INLINE int taosEncodeVariantI32(void **buf, int32_t value) {
return
taosEncodeVariantU32
(
buf
,
ZIGZAGE
(
int32_t
,
value
));
}
static
FORCE_INLINE
void
*
taosDecodeVariantI32
(
void
*
buf
,
int32_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeVariantI32
(
const
void
*
buf
,
int32_t
*
value
)
{
uint32_t
tvalue
=
0
;
void
*
ret
=
taosDecodeVariantU32
(
buf
,
&
tvalue
);
void
*
ret
=
taosDecodeVariantU32
(
buf
,
&
tvalue
);
*
value
=
ZIGZAGD
(
int32_t
,
tvalue
);
return
ret
;
}
...
...
@@ -301,7 +301,7 @@ static FORCE_INLINE int taosEncodeVariantU64(void **buf, uint64_t value) {
return
i
+
1
;
}
static
FORCE_INLINE
void
*
taosDecodeVariantU64
(
void
*
buf
,
uint64_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeVariantU64
(
const
void
*
buf
,
uint64_t
*
value
)
{
int
i
=
0
;
uint64_t
tval
=
0
;
*
value
=
0
;
...
...
@@ -324,9 +324,9 @@ static FORCE_INLINE int taosEncodeVariantI64(void **buf, int64_t value) {
return
taosEncodeVariantU64
(
buf
,
ZIGZAGE
(
int64_t
,
value
));
}
static
FORCE_INLINE
void
*
taosDecodeVariantI64
(
void
*
buf
,
int64_t
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeVariantI64
(
const
void
*
buf
,
int64_t
*
value
)
{
uint64_t
tvalue
=
0
;
void
*
ret
=
taosDecodeVariantU64
(
buf
,
&
tvalue
);
void
*
ret
=
taosDecodeVariantU64
(
buf
,
&
tvalue
);
*
value
=
ZIGZAGD
(
int64_t
,
tvalue
);
return
ret
;
}
...
...
@@ -346,7 +346,7 @@ static FORCE_INLINE int taosEncodeString(void **buf, const char *value) {
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeString
(
void
*
buf
,
char
**
value
)
{
static
FORCE_INLINE
void
*
taosDecodeString
(
const
void
*
buf
,
char
**
value
)
{
uint64_t
size
=
0
;
buf
=
taosDecodeVariantU64
(
buf
,
&
size
);
...
...
@@ -360,7 +360,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
return
POINTER_SHIFT
(
buf
,
size
);
}
static
FORCE_INLINE
void
*
taosDecodeStringTo
(
void
*
buf
,
char
*
value
)
{
static
FORCE_INLINE
void
*
taosDecodeStringTo
(
const
void
*
buf
,
char
*
value
)
{
uint64_t
size
=
0
;
buf
=
taosDecodeVariantU64
(
buf
,
&
size
);
...
...
@@ -373,7 +373,7 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
// ---- binary
static
FORCE_INLINE
int
taosEncodeBinary
(
void
**
buf
,
const
void
*
value
,
int32_t
valueLen
)
{
int
tlen
=
0
;
int
tlen
=
0
;
if
(
buf
!=
NULL
)
{
memcpy
(
*
buf
,
value
,
valueLen
);
...
...
@@ -384,8 +384,7 @@ static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeBinary
(
void
*
buf
,
void
**
value
,
int32_t
valueLen
)
{
static
FORCE_INLINE
void
*
taosDecodeBinary
(
const
void
*
buf
,
void
**
value
,
int32_t
valueLen
)
{
*
value
=
malloc
((
size_t
)
valueLen
);
if
(
*
value
==
NULL
)
return
NULL
;
memcpy
(
*
value
,
buf
,
(
size_t
)
valueLen
);
...
...
@@ -393,8 +392,7 @@ static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valu
return
POINTER_SHIFT
(
buf
,
valueLen
);
}
static
FORCE_INLINE
void
*
taosDecodeBinaryTo
(
void
*
buf
,
void
*
value
,
int32_t
valueLen
)
{
static
FORCE_INLINE
void
*
taosDecodeBinaryTo
(
const
void
*
buf
,
void
*
value
,
int32_t
valueLen
)
{
memcpy
(
value
,
buf
,
(
size_t
)
valueLen
);
return
POINTER_SHIFT
(
buf
,
valueLen
);
}
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
a038c466
...
...
@@ -60,8 +60,8 @@ void mndCleanupTopic(SMnode *pMnode) {}
SSdbRaw
*
mndTopicActionEncode
(
SMqTopicObj
*
pTopic
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
logicalPlanLen
=
strlen
(
pTopic
->
logicalPlan
)
+
1
;
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
int32_t
logicalPlanLen
=
strlen
(
pTopic
->
logicalPlan
)
+
1
;
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
int32_t
size
=
sizeof
(
SMqTopicObj
)
+
logicalPlanLen
+
physicalPlanLen
+
pTopic
->
sqlLen
+
MND_TOPIC_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TOPIC
,
MND_TOPIC_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
TOPIC_ENCODE_OVER
;
...
...
@@ -127,7 +127,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
pTopic
->
sql
=
calloc
(
pTopic
->
sqlLen
+
1
,
sizeof
(
char
));
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
logicalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
...
...
@@ -248,12 +248,21 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
SSdbRaw
*
pTopicRaw
=
mndTopicActionEncode
(
&
topicObj
);
if
(
pTopicRaw
==
NULL
)
return
-
1
;
if
(
sdbSetRawStatus
(
pTopicRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
/*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);*/
/*mndTransAppendRedolog(pTrans, pTopicRaw);*/
/*if (mndTransPrepare(pMnode, pTrans) != 0) {*/
/*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/
/*mndTransDrop(pTrans);*/
/*return -1;*/
/*}*/
/*mndTransDrop(pTrans);*/
/*return 0;*/
return
sdbWrite
(
pMnode
->
pSdb
,
pTopicRaw
);
}
static
int32_t
mndProcessCreateTopicMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
char
*
msgStr
=
pMsg
->
rpcMsg
.
pCont
;
SMnode
*
pMnode
=
pMsg
->
pMnode
;
char
*
msgStr
=
pMsg
->
rpcMsg
.
pCont
;
SCMCreateTopicReq
createTopicReq
=
{
0
};
tDeserializeSCMCreateTopicReq
(
msgStr
,
&
createTopicReq
);
...
...
@@ -288,13 +297,13 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
int32_t
code
=
mndCreateTopic
(
pMnode
,
pMsg
,
&
createTopicReq
,
pDb
);
mndReleaseDb
(
pMnode
,
pDb
);
if
(
code
!=
0
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
mError
(
"topic:%s, failed to create since %s"
,
createTopicReq
.
name
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_SUCCESS
;
return
0
;
}
static
int32_t
mndDropTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SMqTopicObj
*
pTopic
)
{
return
0
;
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
a038c466
...
...
@@ -35,12 +35,12 @@ typedef struct SDnode SDnode;
typedef
int32_t
(
*
PutReqToVQueryQFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
pReq
);
typedef
int32_t
(
*
SendReqToDnodeFp
)(
SDnode
*
pDnode
,
struct
SEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
typedef
struct
STqCfg
{
typedef
struct
{
// TODO
int32_t
reserved
;
}
STqCfg
;
typedef
struct
SVnodeCfg
{
typedef
struct
{
int32_t
vgId
;
SDnode
*
pDnode
;
STfs
*
pTfs
;
...
...
@@ -67,9 +67,9 @@ typedef struct {
SendReqToDnodeFp
sendReqToDnodeFp
;
}
SVnodeOpt
;
typedef
struct
STqReadHandle
{
typedef
struct
{
int64_t
ver
;
uint64_t
tbUid
;
int64_t
tbUid
;
SHashObj
*
tbIdHash
;
const
SSubmitMsg
*
pMsg
;
SSubmitBlk
*
pBlock
;
...
...
@@ -199,7 +199,7 @@ int32_t vnodeCompact(SVnode *pVnode);
int32_t
vnodeSync
(
SVnode
*
pVnode
);
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
);
/* ------------------------- TQ
QUERY
-------------------------- */
/* ------------------------- TQ
READ -
-------------------------- */
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
);
...
...
@@ -207,12 +207,12 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA
pReadHandle
->
pColIdList
=
pColIdList
;
}
// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle,
const SArray* pTableIdList
) {
// pHandle->tbUid =
pTableIdList
;
// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle,
int64_t tbUid
) {
// pHandle->tbUid =
tbUid
;
//}
static
FORCE_INLINE
int
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
pHandle
->
tbIdHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_
U
BIGINT
),
true
,
HASH_NO_LOCK
);
pHandle
->
tbIdHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
if
(
pHandle
->
tbIdHash
==
NULL
)
{
return
-
1
;
}
...
...
source/dnode/vnode/src/inc/tqInt.h
浏览文件 @
a038c466
...
...
@@ -111,10 +111,11 @@ typedef struct {
char
content
[];
}
STqSerializedHead
;
typedef
int
(
*
FTqSerialize
)(
const
void
*
pObj
,
STqSerializedHead
**
ppHead
);
typedef
const
void
*
(
*
FTqDeserialize
)(
const
STqSerializedHead
*
pHead
,
void
**
ppObj
);
typedef
int
32_t
(
*
FTqSerialize
)(
const
void
*
pObj
,
STqSerializedHead
**
ppHead
);
typedef
int32_t
(
*
FTqDeserialize
)(
void
*
self
,
const
STqSerializedHead
*
pHead
,
void
**
ppObj
);
typedef
void
(
*
FTqDelete
)(
void
*
);
typedef
struct
STqMetaHandle
{
typedef
struct
{
int64_t
key
;
int64_t
offset
;
int64_t
serializedSize
;
...
...
@@ -132,6 +133,7 @@ typedef struct STqMetaList {
}
STqMetaList
;
typedef
struct
{
STQ
*
pTq
;
STqMetaList
*
bucket
[
TQ_BUCKET_SIZE
];
// a table head
STqMetaList
*
unpersistHead
;
...
...
@@ -188,21 +190,22 @@ typedef struct {
char
*
logicalPlan
;
char
*
physicalPlan
;
char
*
qmsg
;
int64_t
persistedOffset
;
int64_t
committedOffset
;
int64_t
currentOffset
;
STqBuffer
buffer
;
SWalReadHandle
*
pReadhandle
;
}
STqTopic
Handle
;
}
STqTopic
;
typedef
struct
{
int64_t
consumerId
;
int64_t
epoch
;
char
cgroup
[
TSDB_TOPIC_FNAME_LEN
];
SArray
*
topics
;
// SArray<STq
ClientTopic
>
}
STqConsumer
Handle
;
SArray
*
topics
;
// SArray<STq
TopicHandle
>
}
STqConsumer
;
int
tqSerializeConsumer
(
const
STqConsumerHandle
*
,
STqSerializedHead
**
);
const
void
*
tqDeserializeConsumer
(
const
STqSerializedHead
*
pHead
,
STqConsumerHandle
**
);
int
32_t
tqSerializeConsumer
(
const
STqConsumer
*
,
STqSerializedHead
**
);
int32_t
tqDeserializeConsumer
(
STQ
*
,
const
STqSerializedHead
*
,
STqConsumer
**
);
static
int
FORCE_INLINE
tqQueryExecuting
(
int32_t
status
)
{
return
status
;
}
...
...
source/dnode/vnode/src/inc/tqMetaStore.h
浏览文件 @
a038c466
...
...
@@ -23,8 +23,8 @@
extern
"C"
{
#endif
STqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
FTqSerialize
pSerializer
,
FTqDeserialize
pDeserializer
,
FTqDelete
pDelet
er
,
int32_t
tqConfigFlag
);
STqMetaStore
*
tqStoreOpen
(
STQ
*
pTq
,
const
char
*
path
,
FTqSerialize
pSerializer
,
FTqDeserialize
pDeserializ
er
,
FTqDelete
pDeleter
,
int32_t
tqConfigFlag
);
int32_t
tqStoreClose
(
STqMetaStore
*
);
// int32_t tqStoreDelete(TqMetaStore*);
// int32_t tqStoreCommitAll(TqMetaStore*);
...
...
source/dnode/vnode/src/inc/tqOffset.h
0 → 100644
浏览文件 @
a038c466
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TQ_OFFSET_H_
#define _TD_TQ_OFFSET_H_
#include "tqInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
STqOffsetCfg
STqOffsetCfg
;
typedef
struct
STqOffsetStore
STqOffsetStore
;
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
);
void
STqOffsetClose
(
STqOffsetStore
*
);
int64_t
tqOffsetFetch
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
);
int32_t
tqOffsetCommit
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
,
int64_t
offset
);
int32_t
tqOffsetPersist
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
);
int32_t
tqOffsetPersistAll
(
STqOffsetStore
*
pStore
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TQ_OFFSET_H_*/
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a038c466
...
...
@@ -50,7 +50,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
// TODO: error code of buffer pool
}
#endif
pTq
->
tqMeta
=
tqStoreOpen
(
path
,
(
FTqSerialize
)
tqSerializeConsumer
,
(
FTqDeserialize
)
tqDeserializeConsumer
,
free
,
0
);
pTq
->
tqMeta
=
tqStoreOpen
(
pTq
,
path
,
(
FTqSerialize
)
tqSerializeConsumer
,
(
FTqDeserialize
)
tqDeserializeConsumer
,
free
,
0
);
if
(
pTq
->
tqMeta
==
NULL
)
{
free
(
pTq
);
#if 0
...
...
@@ -76,19 +77,89 @@ int tqPushMsg(STQ* pTq, void* p, int64_t version) {
return
0
;
}
int
tqCommit
(
STQ
*
pTq
)
{
// do nothing
return
0
;
int
tqCommit
(
STQ
*
pTq
)
{
return
tqStorePersist
(
pTq
->
tqMeta
);
}
int32_t
tqGetTopicHandleSize
(
const
STqTopic
*
pTopic
)
{
return
strlen
(
pTopic
->
topicName
)
+
strlen
(
pTopic
->
sql
)
+
strlen
(
pTopic
->
logicalPlan
)
+
strlen
(
pTopic
->
physicalPlan
)
+
strlen
(
pTopic
->
qmsg
)
+
sizeof
(
int64_t
)
*
3
;
}
int32_t
tqGetConsumerHandleSize
(
const
STqConsumer
*
pConsumer
)
{
int
num
=
taosArrayGetSize
(
pConsumer
->
topics
);
int32_t
sz
=
0
;
for
(
int
i
=
0
;
i
<
num
;
i
++
)
{
STqTopic
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
sz
+=
tqGetTopicHandleSize
(
pTopic
);
}
return
sz
;
}
static
FORCE_INLINE
int32_t
tEncodeSTqTopic
(
void
**
buf
,
const
STqTopic
*
pTopic
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pTopic
->
topicName
);
/*tlen += taosEncodeString(buf, pTopic->sql);*/
/*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
/*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
tlen
+=
taosEncodeString
(
buf
,
pTopic
->
qmsg
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pTopic
->
persistedOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pTopic
->
committedOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pTopic
->
currentOffset
);
return
tlen
;
}
static
FORCE_INLINE
const
void
*
tDecodeSTqTopic
(
const
void
*
buf
,
STqTopic
*
pTopic
)
{
buf
=
taosDecodeStringTo
(
buf
,
pTopic
->
topicName
);
/*buf = taosDecodeString(buf, &pTopic->sql);*/
/*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
/*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
buf
=
taosDecodeString
(
buf
,
&
pTopic
->
qmsg
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pTopic
->
persistedOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pTopic
->
committedOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pTopic
->
currentOffset
);
return
buf
;
}
static
FORCE_INLINE
int32_t
tEncodeSTqConsumer
(
void
**
buf
,
const
STqConsumer
*
pConsumer
)
{
int32_t
sz
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
tlen
+=
tEncodeSTqTopic
(
buf
,
pTopic
);
}
return
tlen
;
}
int
tqSerializeConsumer
(
const
STqConsumerHandle
*
pConsumer
,
STqSerializedHead
**
ppHead
)
{
int32_t
num
=
taosArrayGetSize
(
pConsumer
->
topics
);
int32_t
sz
=
sizeof
(
STqSerializedHead
)
+
sizeof
(
int64_t
)
*
2
+
TSDB_TOPIC_FNAME_LEN
+
num
*
(
sizeof
(
int64_t
)
+
TSDB_TOPIC_FNAME_LEN
);
static
FORCE_INLINE
const
void
*
tDecodeSTqConsumer
(
const
void
*
buf
,
STqConsumer
*
pConsumer
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
STqTopic
));
if
(
pConsumer
->
topics
==
NULL
)
return
NULL
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
pTopic
;
buf
=
tDecodeSTqTopic
(
buf
,
&
pTopic
);
taosArrayPush
(
pConsumer
->
topics
,
&
pTopic
);
}
return
buf
;
}
int
tqSerializeConsumer
(
const
STqConsumer
*
pConsumer
,
STqSerializedHead
**
ppHead
)
{
int32_t
sz
=
tEncodeSTqConsumer
(
NULL
,
pConsumer
);
if
(
sz
>
(
*
ppHead
)
->
ssize
)
{
void
*
tmpPtr
=
realloc
(
*
ppHead
,
sz
);
void
*
tmpPtr
=
realloc
(
*
ppHead
,
s
izeof
(
STqSerializedHead
)
+
s
z
);
if
(
tmpPtr
==
NULL
)
{
free
(
*
ppHead
);
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
*
ppHead
=
tmpPtr
;
...
...
@@ -96,42 +167,41 @@ int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead**
}
void
*
ptr
=
(
*
ppHead
)
->
content
;
*
(
int64_t
*
)
ptr
=
pConsumer
->
consumerId
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
*
(
int64_t
*
)
ptr
=
pConsumer
->
epoch
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
memcpy
(
ptr
,
pConsumer
->
topics
,
TSDB_TOPIC_FNAME_LEN
);
ptr
=
POINTER_SHIFT
(
ptr
,
TSDB_TOPIC_FNAME_LEN
);
*
(
int32_t
*
)
ptr
=
num
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
num
;
i
++
)
{
STqTopicHandle
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
memcpy
(
ptr
,
pTopic
->
topicName
,
TSDB_TOPIC_FNAME_LEN
);
ptr
=
POINTER_SHIFT
(
ptr
,
TSDB_TOPIC_FNAME_LEN
);
*
(
int64_t
*
)
ptr
=
pTopic
->
committedOffset
;
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
}
void
*
abuf
=
ptr
;
tEncodeSTqConsumer
(
&
abuf
,
pConsumer
);
return
0
;
}
const
void
*
tqDeserializeConsumer
(
const
STqSerializedHead
*
pHead
,
STqConsumerHandle
**
ppConsumer
)
{
STqConsumerHandle
*
pConsumer
=
*
ppConsumer
;
const
void
*
ptr
=
pHead
->
content
;
pConsumer
->
consumerId
=
*
(
int64_t
*
)
ptr
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
pConsumer
->
epoch
=
*
(
int64_t
*
)
ptr
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int64_t
));
memcpy
(
pConsumer
->
cgroup
,
ptr
,
TSDB_TOPIC_FNAME_LEN
);
ptr
=
POINTER_SHIFT
(
ptr
,
TSDB_TOPIC_FNAME_LEN
);
int32_t
sz
=
*
(
int32_t
*
)
ptr
;
ptr
=
POINTER_SHIFT
(
ptr
,
sizeof
(
int32_t
));
pConsumer
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
STqTopicHandle
));
int32_t
tqDeserializeConsumer
(
STQ
*
pTq
,
const
STqSerializedHead
*
pHead
,
STqConsumer
**
ppConsumer
)
{
const
void
*
str
=
pHead
->
content
;
*
ppConsumer
=
calloc
(
1
,
sizeof
(
STqConsumer
));
if
(
*
ppConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
tDecodeSTqConsumer
(
str
,
*
ppConsumer
)
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
STqConsumer
*
pConsumer
=
*
ppConsumer
;
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
/*STqTopicHandle* topicHandle = */
/*taosArrayPush(pConsumer->topics, );*/
STqTopic
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
pTopic
->
pReadhandle
=
walOpenReadHandle
(
pTq
->
pWal
);
if
(
pTopic
->
pReadhandle
==
NULL
)
{
ASSERT
(
false
);
}
for
(
int
i
=
0
;
i
<
TQ_BUFFER_SIZE
;
i
++
)
{
pTopic
->
buffer
.
output
[
i
].
status
=
0
;
STqReadHandle
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pTq
->
pMeta
);
SReadHandle
handle
=
{.
reader
=
pReadHandle
,
.
meta
=
pTq
->
pMeta
};
pTopic
->
buffer
.
output
[
i
].
pReadHandle
=
pReadHandle
;
pTopic
->
buffer
.
output
[
i
].
task
=
qCreateStreamExecTaskInfo
(
pTopic
->
qmsg
,
&
handle
);
}
}
return
NULL
;
return
0
;
}
int32_t
tqProcessConsumeReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
...
...
@@ -145,7 +215,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
/*printf("vg %d get consume req\n", pReq->head.vgId);*/
STqConsumer
Handle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
pMsg
->
pCont
=
NULL
;
pMsg
->
contLen
=
0
;
...
...
@@ -156,10 +226,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
Handle
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
STqTopic
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
// TODO: support multiple topic in one req
if
(
strcmp
(
pTopic
->
topicName
,
pReq
->
topic
)
!=
0
)
{
/*ASSERT(false);*/
ASSERT
(
false
);
continue
;
}
...
...
@@ -285,8 +355,9 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq
req
=
{
0
};
tDecodeSMqMVRebReq
(
msg
,
&
req
);
STqConsumer
Handle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
ASSERT
(
pConsumer
);
pConsumer
->
consumerId
=
req
.
newConsumerId
;
tqHandleMovePut
(
pTq
->
tqMeta
,
req
.
newConsumerId
,
pConsumer
);
tqHandleCommit
(
pTq
->
tqMeta
,
req
.
newConsumerId
);
tqHandlePurge
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
...
...
@@ -299,19 +370,20 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
tDecodeSMqSetCVgReq
(
msg
,
&
req
);
/*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
STqConsumer
Handle
*
pConsumer
=
calloc
(
1
,
sizeof
(
STqConsumerHandle
));
STqConsumer
*
pConsumer
=
calloc
(
1
,
sizeof
(
STqConsumer
));
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
strcpy
(
pConsumer
->
cgroup
,
req
.
cgroup
);
pConsumer
->
topics
=
taosArrayInit
(
0
,
sizeof
(
STqTopic
Handle
));
pConsumer
->
topics
=
taosArrayInit
(
0
,
sizeof
(
STqTopic
));
pConsumer
->
consumerId
=
req
.
consumerId
;
pConsumer
->
epoch
=
0
;
STqTopic
Handle
*
pTopic
=
calloc
(
1
,
sizeof
(
STqTopicHandle
));
STqTopic
*
pTopic
=
calloc
(
1
,
sizeof
(
STqTopic
));
if
(
pTopic
==
NULL
)
{
taosArrayDestroy
(
pConsumer
->
topics
);
free
(
pConsumer
);
return
-
1
;
}
...
...
@@ -327,6 +399,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic
->
buffer
.
lastOffset
=
-
1
;
pTopic
->
pReadhandle
=
walOpenReadHandle
(
pTq
->
pWal
);
if
(
pTopic
->
pReadhandle
==
NULL
)
{
ASSERT
(
false
);
}
for
(
int
i
=
0
;
i
<
TQ_BUFFER_SIZE
;
i
++
)
{
pTopic
->
buffer
.
output
[
i
].
status
=
0
;
...
...
source/dnode/vnode/src/tq/tqMetaStore.c
浏览文件 @
a038c466
...
...
@@ -68,20 +68,21 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) {
return
lseek
(
fd
,
offset
,
SEEK_SET
);
}
STqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
FTqSerialize
serializer
,
FTqDeserialize
deserializer
,
FTqDelete
delet
er
,
int32_t
tqConfigFlag
)
{
STqMetaStore
*
pMeta
=
malloc
(
sizeof
(
STqMetaStore
));
STqMetaStore
*
tqStoreOpen
(
STQ
*
pTq
,
const
char
*
path
,
FTqSerialize
serializer
,
FTqDeserialize
deserializ
er
,
FTqDelete
deleter
,
int32_t
tqConfigFlag
)
{
STqMetaStore
*
pMeta
=
calloc
(
1
,
sizeof
(
STqMetaStore
));
if
(
pMeta
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
NULL
;
}
memset
(
pMeta
,
0
,
sizeof
(
STqMetaStore
))
;
pMeta
->
pTq
=
pTq
;
// concat data file name and index file name
size_t
pathLen
=
strlen
(
path
);
pMeta
->
dirPath
=
malloc
(
pathLen
+
1
);
if
(
pMeta
->
dirPath
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
free
(
pMeta
);
return
NULL
;
}
strcpy
(
pMeta
->
dirPath
,
path
);
...
...
@@ -103,12 +104,11 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
}
pMeta
->
idxFd
=
idxFd
;
pMeta
->
unpersistHead
=
malloc
(
sizeof
(
STqMetaList
));
pMeta
->
unpersistHead
=
calloc
(
1
,
sizeof
(
STqMetaList
));
if
(
pMeta
->
unpersistHead
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
NULL
;
}
memset
(
pMeta
->
unpersistHead
,
0
,
sizeof
(
STqMetaList
));
pMeta
->
unpersistHead
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistPrev
=
pMeta
->
unpersistHead
;
strcpy
(
name
,
path
);
...
...
@@ -145,12 +145,11 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
ASSERT
(
idxBuf
.
head
.
writeOffset
==
idxRead
);
// loop read every entry
for
(
int
i
=
0
;
i
<
idxBuf
.
head
.
writeOffset
-
TQ_IDX_PAGE_HEAD_SIZE
;
i
+=
TQ_IDX_SIZE
)
{
STqMetaList
*
pNode
=
malloc
(
sizeof
(
STqMetaList
));
STqMetaList
*
pNode
=
calloc
(
1
,
sizeof
(
STqMetaList
));
if
(
pNode
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// TODO: free memory
}
memset
(
pNode
,
0
,
sizeof
(
STqMetaList
));
memcpy
(
&
pNode
->
handle
,
&
idxBuf
.
buffer
[
i
],
TQ_IDX_SIZE
);
lseek
(
fileFd
,
pNode
->
handle
.
offset
,
SEEK_SET
);
...
...
@@ -169,25 +168,25 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
}
if
(
serializedObj
->
action
==
TQ_ACTION_INUSE
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
STqSerializedHead
))
{
pMeta
->
pDeserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
pMeta
->
pDeserializer
(
pTq
,
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
}
else
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
}
else
if
(
serializedObj
->
action
==
TQ_ACTION_INTXN
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
STqSerializedHead
))
{
pMeta
->
pDeserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInTxn
);
pMeta
->
pDeserializer
(
pTq
,
serializedObj
,
&
pNode
->
handle
.
valueInTxn
);
}
else
{
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
}
else
if
(
serializedObj
->
action
==
TQ_ACTION_INUSE_CONT
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
STqSerializedHead
))
{
pMeta
->
pDeserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
pMeta
->
pDeserializer
(
pTq
,
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
}
else
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
STqSerializedHead
*
ptr
=
POINTER_SHIFT
(
serializedObj
,
serializedObj
->
ssize
);
if
(
ptr
->
ssize
!=
sizeof
(
STqSerializedHead
))
{
pMeta
->
pDeserializer
(
ptr
,
&
pNode
->
handle
.
valueInTxn
);
pMeta
->
pDeserializer
(
p
Tq
,
p
tr
,
&
pNode
->
handle
.
valueInTxn
);
}
else
{
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
...
...
@@ -302,7 +301,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
pSHead
->
ver
=
TQ_SVER
;
pSHead
->
checksum
=
0
;
pSHead
->
ssize
=
sizeof
(
STqSerializedHead
);
int
allocatedSize
=
sizeof
(
STqSerializedHead
);
/*int allocatedSize = sizeof(STqSerializedHead);*/
int
offset
=
lseek
(
pMeta
->
fileFd
,
0
,
SEEK_CUR
);
tqReadLastPage
(
pMeta
->
idxFd
,
&
idxBuf
);
...
...
@@ -417,14 +416,14 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
pNode
=
pNode
->
next
;
}
}
STqMetaList
*
pNewNode
=
malloc
(
sizeof
(
STqMetaList
));
STqMetaList
*
pNewNode
=
calloc
(
1
,
sizeof
(
STqMetaList
));
if
(
pNewNode
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
STqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInUse
=
value
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
// put into unpersist list
pNewNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pNewNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
...
...
@@ -489,12 +488,11 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
pNode
=
pNode
->
next
;
}
}
STqMetaList
*
pNewNode
=
malloc
(
sizeof
(
STqMetaList
));
STqMetaList
*
pNewNode
=
calloc
(
1
,
sizeof
(
STqMetaList
));
if
(
pNewNode
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
STqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
value
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
...
...
source/dnode/vnode/src/tq/tqOffset.c
0 → 100644
浏览文件 @
a038c466
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tqOffset.h"
enum
ETqOffsetPersist
{
TQ_OFFSET_PERSIST__LAZY
=
1
,
TQ_OFFSET_PERSIST__EAGER
,
};
struct
STqOffsetCfg
{
int8_t
persistPolicy
;
};
struct
STqOffsetStore
{
STqOffsetCfg
cfg
;
SHashObj
*
pHash
;
// SHashObj<subscribeKey, offset>
};
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
pCfg
)
{
STqOffsetStore
*
pStore
=
malloc
(
sizeof
(
STqOffsetStore
));
if
(
pStore
==
NULL
)
{
return
NULL
;
}
memcpy
(
&
pStore
->
cfg
,
pCfg
,
sizeof
(
STqOffsetCfg
));
pStore
->
pHash
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_NO_LOCK
);
return
pStore
;
}
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
a038c466
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "
tqInt
.h"
#include "
vnode
.h"
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
)
{
STqReadHandle
*
pReadHandle
=
malloc
(
sizeof
(
STqReadHandle
));
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
a038c466
...
...
@@ -18,7 +18,7 @@
#define TSDB_MAX_SUBBLOCKS 8
typedef
struct
{
STable
*
pTable
;
STable
*
pTable
;
SSkipListIterator
*
pIter
;
}
SCommitIter
;
...
...
@@ -34,11 +34,11 @@ typedef struct {
bool
isLFileSame
;
TSKEY
minKey
;
TSKEY
maxKey
;
SArray
*
aBlkIdx
;
// SBlockIdx array
STable
*
pTable
;
SArray
*
aSupBlk
;
// Table super-block array
SArray
*
aSubBlk
;
// table sub-block array
SDataCols
*
pDataCols
;
SArray
*
aBlkIdx
;
// SBlockIdx array
STable
*
pTable
;
SArray
*
aSupBlk
;
// Table super-block array
SArray
*
aSubBlk
;
// table sub-block array
SDataCols
*
pDataCols
;
}
SCommitH
;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
...
...
@@ -90,7 +90,7 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int
tsdbApplyRtnOnFSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
)
{
SDiskID
did
;
SDFileSet
nSet
=
{
0
};
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
int
level
;
ASSERT
(
pSet
->
fid
>=
pRtn
->
minFid
);
...
...
@@ -135,12 +135,13 @@ int tsdbPrepareCommit(STsdb *pTsdb) {
pTsdb
->
imem
=
pTsdb
->
mem
;
pTsdb
->
mem
=
NULL
;
return
0
;
}
int
tsdbCommit
(
STsdb
*
pRepo
)
{
STsdbMemTable
*
pMem
=
pRepo
->
imem
;
SCommitH
commith
=
{
0
};
SDFileSet
*
pSet
=
NULL
;
SDFileSet
*
pSet
=
NULL
;
int
fid
;
if
(
pRepo
->
imem
==
NULL
)
return
0
;
...
...
@@ -303,7 +304,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
}
static
int
tsdbNextCommitFid
(
SCommitH
*
pCommith
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
int
fid
=
TSDB_IVLD_FID
;
...
...
@@ -336,7 +337,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
}
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
ASSERT
(
pSet
==
NULL
||
pSet
->
fid
==
fid
);
...
...
@@ -391,12 +392,12 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
static
int
tsdbCreateCommitIters
(
SCommitH
*
pCommith
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbMemTable
*
pMem
=
pRepo
->
imem
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbMemTable
*
pMem
=
pRepo
->
imem
;
SSkipListIterator
*
pSlIter
;
SCommitIter
*
pCommitIter
;
SSkipListNode
*
pNode
;
STbData
*
pTbData
;
SCommitIter
*
pCommitIter
;
SSkipListNode
*
pNode
;
STbData
*
pTbData
;
pCommith
->
niters
=
SL_SIZE
(
pMem
->
pSlIdx
);
pCommith
->
iters
=
(
SCommitIter
*
)
calloc
(
pCommith
->
niters
,
sizeof
(
SCommitIter
));
...
...
@@ -452,7 +453,7 @@ static void tsdbResetCommitFile(SCommitH *pCommith) {
static
int
tsdbSetAndOpenCommitFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
SDiskID
did
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
SDFileSet
*
pWSet
=
TSDB_COMMIT_WRITE_FSET
(
pCommith
);
if
(
tfsAllocDisk
(
pRepo
->
pTfs
,
tsdbGetFidLevel
(
fid
,
&
(
pCommith
->
rtn
)),
&
did
)
<
0
)
{
...
...
@@ -583,7 +584,7 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
uint32_t
tlen
;
SBlockInfo
*
pBlkInfo
;
int64_t
offset
;
SBlock
*
pBlock
;
SBlock
*
pBlock
;
memset
(
pIdx
,
0
,
sizeof
(
*
pIdx
));
...
...
@@ -1130,7 +1131,7 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
int
tsdbWriteBlockImpl
(
STsdb
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppCBuf
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SBlockData
*
pBlockData
;
int64_t
offset
=
0
;
int
rowsToWrite
=
pDataCols
->
numOfRows
;
...
...
@@ -1147,7 +1148,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// Get # of cols not all NULL(not including key column)
int
nColsNotAllNull
=
0
;
for
(
int
ncol
=
1
;
ncol
<
pDataCols
->
numOfCols
;
ncol
++
)
{
// ncol from 1, we skip the timestamp column
SDataCol
*
pDataCol
=
pDataCols
->
cols
+
ncol
;
SDataCol
*
pDataCol
=
pDataCols
->
cols
+
ncol
;
SBlockCol
*
pBlockCol
=
pBlockData
->
cols
+
nColsNotAllNull
;
if
(
isAllRowsNull
(
pDataCol
))
{
// all data to commit are NULL, just ignore it
...
...
@@ -1188,7 +1189,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// All not NULL columns finish
if
(
ncol
!=
0
&&
tcol
>=
nColsNotAllNull
)
break
;
SDataCol
*
pDataCol
=
pDataCols
->
cols
+
ncol
;
SDataCol
*
pDataCol
=
pDataCols
->
cols
+
ncol
;
SBlockCol
*
pBlockCol
=
pBlockData
->
cols
+
tcol
;
if
(
ncol
!=
0
&&
(
pDataCol
->
colId
!=
pBlockCol
->
colId
))
continue
;
...
...
@@ -1212,7 +1213,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
}
#endif
void
*
tptr
;
void
*
tptr
;
// Make room
if
(
tsdbMakeRoom
(
ppBuf
,
lsize
+
tlen
+
COMP_OVERFLOW_BYTES
+
sizeof
(
TSCKSUM
))
<
0
)
{
...
...
@@ -1278,7 +1279,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock
->
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
pBlock
->
keyLast
=
dataColsKeyLast
(
pDataCols
);
tsdbDebug
(
"vgId:%d uid:%"
PRId64
" a block of data is written to file %s, offset %"
PRId64
tsdbDebug
(
"vgId:%d uid:%"
PRId64
" a block of data is written to file %s, offset %"
PRId64
" numOfRows %d len %d numOfCols %"
PRId16
" keyFirst %"
PRId64
" keyLast %"
PRId64
,
REPO_ID
(
pRepo
),
TABLE_TID
(
pTable
),
TSDB_FILE_FULL_NAME
(
pDFile
),
offset
,
rowsToWrite
,
pBlock
->
len
,
pBlock
->
numOfCols
,
pBlock
->
keyFirst
,
pBlock
->
keyLast
);
...
...
@@ -1294,9 +1295,9 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
}
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
)
{
SDFile
*
pHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommih
);
SDFile
*
pHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommih
);
SBlockIdx
blkIdx
;
STable
*
pTable
=
TSDB_COMMIT_TABLE
(
pCommih
);
STable
*
pTable
=
TSDB_COMMIT_TABLE
(
pCommih
);
if
(
tsdbWriteBlockInfoImpl
(
pHeadf
,
pTable
,
pCommih
->
aSupBlk
,
pCommih
->
aSubBlk
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommih
))),
&
blkIdx
)
<
0
)
{
...
...
@@ -1316,11 +1317,11 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
}
static
int
tsdbCommitMemData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
TSKEY
keyLimit
,
bool
toData
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SMergeInfo
mInfo
;
int32_t
defaultRows
=
TSDB_COMMIT_DEFAULT_ROWS
(
pCommith
);
SDFile
*
pDFile
;
SDFile
*
pDFile
;
bool
isLast
;
SBlock
block
;
...
...
@@ -1349,16 +1350,16 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
}
static
int
tsdbMergeMemData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
int
bidx
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
int
nBlocks
=
pCommith
->
readh
.
pBlkIdx
->
numOfBlocks
;
SBlock
*
pBlock
=
pCommith
->
readh
.
pBlkInfo
->
blocks
+
bidx
;
SBlock
*
pBlock
=
pCommith
->
readh
.
pBlkInfo
->
blocks
+
bidx
;
TSKEY
keyLimit
;
int16_t
colId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
SMergeInfo
mInfo
;
SBlock
subBlocks
[
TSDB_MAX_SUBBLOCKS
];
SBlock
block
,
supBlock
;
SDFile
*
pDFile
;
SDFile
*
pDFile
;
if
(
bidx
==
nBlocks
-
1
)
{
keyLimit
=
pCommith
->
maxKey
;
...
...
@@ -1474,10 +1475,10 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
static
int
tsdbMergeBlockData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
SDataCols
*
pDataCols
,
TSKEY
keyLimit
,
bool
isLastOneBlock
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SBlock
block
;
SDFile
*
pDFile
;
SDFile
*
pDFile
;
bool
isLast
;
int32_t
defaultRows
=
TSDB_COMMIT_DEFAULT_ROWS
(
pCommith
);
...
...
@@ -1598,7 +1599,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
}
static
bool
tsdbCanAddSubBlock
(
SCommitH
*
pCommith
,
SBlock
*
pBlock
,
SMergeInfo
*
pInfo
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
int
mergeRows
=
pBlock
->
numOfRows
+
pInfo
->
rowsInserted
-
pInfo
->
rowsDeleteSucceed
;
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
a038c466
...
...
@@ -28,6 +28,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
taosEncodeFixedI64
(
&
pBuf
,
ver
);
if
(
walWrite
(
pVnode
->
pWal
,
ver
,
pMsg
->
msgType
,
pMsg
->
pCont
,
pMsg
->
contLen
)
<
0
)
{
/*ASSERT(false);*/
// TODO: handle error
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录