Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a5706ea7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a5706ea7
编写于
5月 05, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(stream): insert data
上级
3496381f
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
261 addition
and
70 deletion
+261
-70
example/src/tstream.c
example/src/tstream.c
+4
-3
include/common/tdatablock.h
include/common/tdatablock.h
+13
-12
include/common/tdataformat.h
include/common/tdataformat.h
+7
-8
include/common/tmsg.h
include/common/tmsg.h
+3
-1
include/common/trow.h
include/common/trow.h
+18
-20
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+3
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+83
-19
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+46
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+19
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/inc/mndOffset.h
source/dnode/mnode/impl/inc/mndOffset.h
+2
-0
source/dnode/mnode/impl/inc/mndSubscribe.h
source/dnode/mnode/impl/inc/mndSubscribe.h
+2
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+7
-1
source/dnode/mnode/impl/src/mndOffset.c
source/dnode/mnode/impl/src/mndOffset.c
+33
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+4
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+6
-0
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+10
-4
未找到文件。
example/src/tstream.c
浏览文件 @
a5706ea7
...
@@ -81,9 +81,10 @@ int32_t create_stream() {
...
@@ -81,9 +81,10 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pRes
=
pConn
,
taos_query
(
pConn
,
"create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)"
);
"create stream stream1 trigger window_close as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
"from tu1 interval(10m)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
return
-
1
;
...
...
include/common/tdatablock.h
浏览文件 @
a5706ea7
...
@@ -54,8 +54,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
...
@@ -54,8 +54,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataIsNull_var(pColumnInfoData, row)
(pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row)
(pColumnInfoData->varmeta.offset[row] = -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
...
@@ -63,16 +63,15 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
...
@@ -63,16 +63,15 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
// SColumnInfoData, rowNumber
// SColumnInfoData, rowNumber
#define colDataGetData(p1_, r_) \
#define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
: colDataGetNumData(p1_, r_))
static
FORCE_INLINE
bool
colDataIsNull_s
(
const
SColumnInfoData
*
pColumnInfoData
,
uint32_t
row
)
{
static
FORCE_INLINE
bool
colDataIsNull_s
(
const
SColumnInfoData
*
pColumnInfoData
,
uint32_t
row
)
{
if
(
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_JSON
){
if
(
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
colDataIsNull_var
(
pColumnInfoData
,
row
))
{
if
(
colDataIsNull_var
(
pColumnInfoData
,
row
))
{
return
true
;
return
true
;
}
}
char
*
data
=
colDataGetVarData
(
pColumnInfoData
,
row
);
char
*
data
=
colDataGetVarData
(
pColumnInfoData
,
row
);
return
(
*
data
==
TSDB_DATA_TYPE_NULL
);
return
(
*
data
==
TSDB_DATA_TYPE_NULL
);
}
}
...
@@ -80,7 +79,7 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
...
@@ -80,7 +79,7 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
return
false
;
return
false
;
}
}
if
(
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_VARCHAR
||
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_VARCHAR
||
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
return
colDataIsNull_var
(
pColumnInfoData
,
row
);
return
colDataIsNull_var
(
pColumnInfoData
,
row
);
}
else
{
}
else
{
if
(
pColumnInfoData
->
nullbitmap
==
NULL
)
{
if
(
pColumnInfoData
->
nullbitmap
==
NULL
)
{
...
@@ -132,7 +131,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
...
@@ -132,7 +131,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
static
FORCE_INLINE
void
colDataAppendNNULL
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
start
,
size_t
nRows
)
{
static
FORCE_INLINE
void
colDataAppendNNULL
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
start
,
size_t
nRows
)
{
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
for
(
int32_t
i
=
start
;
i
<
start
+
nRows
;
++
i
)
{
for
(
int32_t
i
=
start
;
i
<
start
+
nRows
;
++
i
)
{
colDataSetNull_var
(
pColumnInfoData
,
i
);
// it is a null value of VAR type.
colDataSetNull_var
(
pColumnInfoData
,
i
);
// it is a null value of VAR type.
}
}
}
else
{
}
else
{
for
(
int32_t
i
=
start
;
i
<
start
+
nRows
;
++
i
)
{
for
(
int32_t
i
=
start
;
i
<
start
+
nRows
;
++
i
)
{
...
@@ -225,6 +224,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
...
@@ -225,6 +224,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void
blockDebugShowData
(
const
SArray
*
dataBlocks
);
void
blockDebugShowData
(
const
SArray
*
dataBlocks
);
SSubmitReq
*
tdBlockToSubmit
(
const
SArray
*
pBlocks
,
const
STSchema
*
pSchema
);
static
FORCE_INLINE
int32_t
blockGetEncodeSize
(
const
SSDataBlock
*
pBlock
)
{
static
FORCE_INLINE
int32_t
blockGetEncodeSize
(
const
SSDataBlock
*
pBlock
)
{
return
blockDataGetSerialMetaSize
(
pBlock
)
+
blockDataGetSize
(
pBlock
);
return
blockDataGetSerialMetaSize
(
pBlock
)
+
blockDataGetSize
(
pBlock
);
}
}
...
@@ -238,10 +239,10 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
...
@@ -238,10 +239,10 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
static
FORCE_INLINE
void
blockCompressEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
*
dataLen
,
int32_t
numOfCols
,
static
FORCE_INLINE
void
blockCompressEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
*
dataLen
,
int32_t
numOfCols
,
int8_t
needCompress
)
{
int8_t
needCompress
)
{
int32_t
*
actualLen
=
(
int32_t
*
)
data
;
int32_t
*
actualLen
=
(
int32_t
*
)
data
;
data
+=
sizeof
(
int32_t
);
data
+=
sizeof
(
int32_t
);
uint64_t
*
groupId
=
(
uint64_t
*
)
data
;
uint64_t
*
groupId
=
(
uint64_t
*
)
data
;
data
+=
sizeof
(
uint64_t
);
data
+=
sizeof
(
uint64_t
);
int32_t
*
colSizes
=
(
int32_t
*
)
data
;
int32_t
*
colSizes
=
(
int32_t
*
)
data
;
...
...
include/common/tdataformat.h
浏览文件 @
a5706ea7
...
@@ -61,11 +61,11 @@ extern "C" {
...
@@ -61,11 +61,11 @@ extern "C" {
// ----------------- TSDB COLUMN DEFINITION
// ----------------- TSDB COLUMN DEFINITION
#pragma pack(push, 1)
#pragma pack(push, 1)
typedef
struct
{
typedef
struct
{
col_id_t
colId
;
// column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
col_id_t
colId
;
// column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
int
32_t
type
:
8
;
// column type
int
8_t
type
;
// column type
int
32_t
bytes
:
24
;
// column bytes (0~16M)
int
8_t
sma
;
// block SMA: 0, no SMA, 1, sum/min/max, 2, ...
int32_t
sma
:
8
;
// block SMA: 0, no SMA, 1, sum/min/max, 2, ...
int32_t
bytes
;
// column bytes (0~16M)
int32_t
offset
:
24
;
// point offset in STpRow after the header part.
int32_t
offset
;
// point offset in STpRow after the header part.
}
STColumn
;
}
STColumn
;
#pragma pack(pop)
#pragma pack(pop)
...
@@ -387,8 +387,6 @@ typedef struct SDataCol {
...
@@ -387,8 +387,6 @@ typedef struct SDataCol {
TSKEY
ts
;
// only used in last NULL column
TSKEY
ts
;
// only used in last NULL column
}
SDataCol
;
}
SDataCol
;
#define isAllRowsNull(pCol) ((pCol)->len == 0)
#define isAllRowsNull(pCol) ((pCol)->len == 0)
#define isAllRowsNone(pCol) ((pCol)->len == 0)
#define isAllRowsNone(pCol) ((pCol)->len == 0)
static
FORCE_INLINE
void
dataColReset
(
SDataCol
*
pDataCol
)
{
pDataCol
->
len
=
0
;
}
static
FORCE_INLINE
void
dataColReset
(
SDataCol
*
pDataCol
)
{
pDataCol
->
len
=
0
;
}
...
@@ -482,7 +480,8 @@ void tdResetDataCols(SDataCols *pCols);
...
@@ -482,7 +480,8 @@ void tdResetDataCols(SDataCols *pCols);
int32_t
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
);
int32_t
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
);
SDataCols
*
tdDupDataCols
(
SDataCols
*
pCols
,
bool
keepData
);
SDataCols
*
tdDupDataCols
(
SDataCols
*
pCols
,
bool
keepData
);
SDataCols
*
tdFreeDataCols
(
SDataCols
*
pCols
);
SDataCols
*
tdFreeDataCols
(
SDataCols
*
pCols
);
int32_t
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
source
,
int32_t
rowsToMerge
,
int32_t
*
pOffset
,
bool
forceSetNull
,
TDRowVerT
maxVer
);
int32_t
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
source
,
int32_t
rowsToMerge
,
int32_t
*
pOffset
,
bool
forceSetNull
,
TDRowVerT
maxVer
);
// ----------------- K-V data row structure
// ----------------- K-V data row structure
/* |<-------------------------------------- len -------------------------------------------->|
/* |<-------------------------------------- len -------------------------------------------->|
...
...
include/common/tmsg.h
浏览文件 @
a5706ea7
...
@@ -245,6 +245,8 @@ int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
...
@@ -245,6 +245,8 @@ int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t
tGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
int32_t
tGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
int32_t
tInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
int32_t
tInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
STSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
STSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
// for debug
int32_t
tPrintFixedSchemaSubmitReq
(
const
SSubmitReq
*
pReq
,
STSchema
*
pSchema
);
// TODO: KEEP one suite of iterator API finally.
// TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
...
@@ -2137,7 +2139,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp
...
@@ -2137,7 +2139,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp
if
(
tDecodeI32v
(
pDecoder
,
&
pSW
->
nCols
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pSW
->
nCols
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pSW
->
sver
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pSW
->
sver
)
<
0
)
return
-
1
;
pSW
->
pSchema
=
(
SSchema
*
)
t
CoderMalloc
(
pDecoder
,
sizeof
(
SSchema
)
*
pSW
->
nCols
);
pSW
->
pSchema
=
(
SSchema
*
)
t
aosMemoryCalloc
(
pSW
->
nCols
,
sizeof
(
SSchema
)
);
if
(
pSW
->
pSchema
==
NULL
)
return
-
1
;
if
(
pSW
->
pSchema
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
if
(
tDecodeSSchema
(
pDecoder
,
&
pSW
->
pSchema
[
i
])
<
0
)
return
-
1
;
if
(
tDecodeSSchema
(
pDecoder
,
&
pSW
->
pSchema
[
i
])
<
0
)
return
-
1
;
...
...
include/common/trow.h
浏览文件 @
a5706ea7
...
@@ -622,7 +622,6 @@ static FORCE_INLINE int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols
...
@@ -622,7 +622,6 @@ static FORCE_INLINE int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
/**
/**
* @brief To judge row type: STpRow/SKvRow
* @brief To judge row type: STpRow/SKvRow
*
*
...
@@ -758,7 +757,6 @@ static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
...
@@ -758,7 +757,6 @@ static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
/**
/**
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
*
*
...
@@ -1250,16 +1248,16 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
...
@@ -1250,16 +1248,16 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
}
}
/**
/**
* @brief
* @brief
*
*
* @param pRow
* @param pRow
* @param colId
* @param colId
* @param colType
* @param colType
* @param flen
* @param flen
* @param offset
* @param offset
* @param colIdx start from 0
* @param colIdx start from 0
* @param pVal
* @param pVal
* @return FORCE_INLINE
* @return FORCE_INLINE
*/
*/
static
FORCE_INLINE
bool
tdSTpRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_type_t
colType
,
int32_t
flen
,
uint32_t
offset
,
static
FORCE_INLINE
bool
tdSTpRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_type_t
colType
,
int32_t
flen
,
uint32_t
offset
,
col_id_t
colIdx
,
SCellVal
*
pVal
)
{
col_id_t
colIdx
,
SCellVal
*
pVal
)
{
...
@@ -1273,14 +1271,14 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t
...
@@ -1273,14 +1271,14 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t
}
}
/**
/**
* @brief
* @brief
*
*
* @param pRow
* @param pRow
* @param colId
* @param colId
* @param offset
* @param offset
* @param colIdx start from 0
* @param colIdx start from 0
* @param pVal
* @param pVal
* @return FORCE_INLINE
* @return FORCE_INLINE
*/
*/
static
FORCE_INLINE
bool
tdSKvRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
uint32_t
offset
,
col_id_t
colIdx
,
static
FORCE_INLINE
bool
tdSKvRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
uint32_t
offset
,
col_id_t
colIdx
,
SCellVal
*
pVal
)
{
SCellVal
*
pVal
)
{
...
@@ -1397,14 +1395,14 @@ static void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
...
@@ -1397,14 +1395,14 @@ static void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
}
}
}
}
static
void
tdSRowPrint
(
STSRow
*
row
,
STSchema
*
pSchema
,
const
char
*
tag
)
{
static
void
tdSRowPrint
(
STSRow
*
row
,
STSchema
*
pSchema
,
const
char
*
tag
)
{
STSRowIter
iter
=
{
0
};
STSRowIter
iter
=
{
0
};
tdSTSRowIterInit
(
&
iter
,
pSchema
);
tdSTSRowIterInit
(
&
iter
,
pSchema
);
tdSTSRowIterReset
(
&
iter
,
row
);
tdSTSRowIterReset
(
&
iter
,
row
);
printf
(
"%s >>>"
,
tag
);
printf
(
"%s >>>"
,
tag
);
for
(
int
i
=
0
;
i
<
pSchema
->
numOfCols
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pSchema
->
numOfCols
;
++
i
)
{
STColumn
*
stCol
=
pSchema
->
columns
+
i
;
STColumn
*
stCol
=
pSchema
->
columns
+
i
;
SCellVal
sVal
=
{
255
,
NULL
};
SCellVal
sVal
=
{
255
,
NULL
};
if
(
!
tdSTSRowIterNext
(
&
iter
,
stCol
->
colId
,
stCol
->
type
,
&
sVal
))
{
if
(
!
tdSTSRowIterNext
(
&
iter
,
stCol
->
colId
,
stCol
->
type
,
&
sVal
))
{
break
;
break
;
}
}
...
...
include/libs/stream/tstream.h
浏览文件 @
a5706ea7
...
@@ -70,8 +70,10 @@ typedef struct {
...
@@ -70,8 +70,10 @@ typedef struct {
}
STaskDispatcherShuffle
;
}
STaskDispatcherShuffle
;
typedef
struct
{
typedef
struct
{
int8_t
reserved
;
int8_t
reserved
;
SSchemaWrapper
*
pSchemaWrapper
;
// not applicable to encoder and decoder
// not applicable to encoder and decoder
STSchema
*
pTSchema
;
SHashObj
*
pHash
;
// groupId to tbuid
SHashObj
*
pHash
;
// groupId to tbuid
}
STaskSinkTb
;
}
STaskSinkTb
;
...
...
source/common/src/tdatablock.c
浏览文件 @
a5706ea7
...
@@ -194,17 +194,17 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
...
@@ -194,17 +194,17 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
int32_t
i
=
0
;
int32_t
i
=
0
;
uint8_t
*
start
=
(
uint8_t
*
)
&
pColumnInfoData
->
nullbitmap
[
BitmapLen
(
numOfRow1
)];
uint8_t
*
start
=
(
uint8_t
*
)
&
pColumnInfoData
->
nullbitmap
[
BitmapLen
(
numOfRow1
)];
int32_t
overCount
=
BitmapLen
(
total
)
-
BitmapLen
(
numOfRow1
);
int32_t
overCount
=
BitmapLen
(
total
)
-
BitmapLen
(
numOfRow1
);
while
(
i
<
len
)
{
// size limit of pSource->nullbitmap
while
(
i
<
len
)
{
// size limit of pSource->nullbitmap
if
(
i
>=
1
)
{
if
(
i
>=
1
)
{
start
[
i
-
1
]
|=
(
p
[
i
]
>>
remindBits
);
//
copy remind bits
start
[
i
-
1
]
|=
(
p
[
i
]
>>
remindBits
);
//
copy remind bits
}
}
if
(
i
>=
overCount
)
{
// size limit of pColumnInfoData->nullbitmap
if
(
i
>=
overCount
)
{
// size limit of pColumnInfoData->nullbitmap
return
;
return
;
}
}
start
[
i
]
|=
(
p
[
i
]
<<
shiftBits
);
//copy shift bits
start
[
i
]
|=
(
p
[
i
]
<<
shiftBits
);
//
copy shift bits
i
+=
1
;
i
+=
1
;
}
}
}
}
...
@@ -354,7 +354,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
...
@@ -354,7 +354,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
int32_t
numOfCols
=
pDest
->
info
.
numOfCols
;
int32_t
numOfCols
=
pDest
->
info
.
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
int32_t
mapIndex
=
i
;
int32_t
mapIndex
=
i
;
if
(
pIndexMap
)
{
if
(
pIndexMap
)
{
mapIndex
=
*
(
int32_t
*
)
taosArrayGet
(
pIndexMap
,
i
);
mapIndex
=
*
(
int32_t
*
)
taosArrayGet
(
pIndexMap
,
i
);
}
}
SColumnInfoData
*
pCol2
=
taosArrayGet
(
pDest
->
pDataBlock
,
i
);
SColumnInfoData
*
pCol2
=
taosArrayGet
(
pDest
->
pDataBlock
,
i
);
...
@@ -451,7 +451,6 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
...
@@ -451,7 +451,6 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
// all fit in
// all fit in
*
stopIndex
=
numOfRows
-
1
;
*
stopIndex
=
numOfRows
-
1
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
SSDataBlock
*
blockDataExtractBlock
(
SSDataBlock
*
pBlock
,
int32_t
startIndex
,
int32_t
rowCount
)
{
SSDataBlock
*
blockDataExtractBlock
(
SSDataBlock
*
pBlock
,
int32_t
startIndex
,
int32_t
rowCount
)
{
...
@@ -556,7 +555,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
...
@@ -556,7 +555,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
if
(
IS_VAR_DATA_TYPE
(
pCol
->
info
.
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
info
.
type
))
{
size_t
metaSize
=
pBlock
->
info
.
rows
*
sizeof
(
int32_t
);
size_t
metaSize
=
pBlock
->
info
.
rows
*
sizeof
(
int32_t
);
char
*
tmp
=
taosMemoryRealloc
(
pCol
->
varmeta
.
offset
,
metaSize
);
// preview calloc is too small
char
*
tmp
=
taosMemoryRealloc
(
pCol
->
varmeta
.
offset
,
metaSize
);
// preview calloc is too small
if
(
tmp
==
NULL
)
{
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
...
@@ -938,8 +937,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
...
@@ -938,8 +937,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
copyBackToBlock
(
pDataBlock
,
pCols
);
copyBackToBlock
(
pDataBlock
,
pCols
);
int64_t
p4
=
taosGetTimestampUs
();
int64_t
p4
=
taosGetTimestampUs
();
uDebug
(
"blockDataSort complex sort:%"
PRId64
", create:%"
PRId64
", assign:%"
PRId64
", copyback:%"
PRId64
", rows:%d
\n
"
,
p1
-
p0
,
p2
-
p1
,
uDebug
(
"blockDataSort complex sort:%"
PRId64
", create:%"
PRId64
", assign:%"
PRId64
", copyback:%"
PRId64
p3
-
p2
,
p4
-
p3
,
rows
);
", rows:%d
\n
"
,
p1
-
p0
,
p2
-
p1
,
p3
-
p2
,
p4
-
p3
,
rows
);
destroyTupleIndex
(
index
);
destroyTupleIndex
(
index
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -1176,7 +1176,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
...
@@ -1176,7 +1176,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
}
}
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
,
bool
copyData
)
{
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
,
bool
copyData
)
{
if
(
pDataBlock
==
NULL
)
{
if
(
pDataBlock
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
...
@@ -1187,7 +1187,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
...
@@ -1187,7 +1187,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pBlock
->
info
.
numOfCols
=
numOfCols
;
pBlock
->
info
.
numOfCols
=
numOfCols
;
pBlock
->
info
.
hasVarCol
=
pDataBlock
->
info
.
hasVarCol
;
pBlock
->
info
.
hasVarCol
=
pDataBlock
->
info
.
hasVarCol
;
pBlock
->
info
.
rowSize
=
pDataBlock
->
info
.
rows
;
pBlock
->
info
.
rowSize
=
pDataBlock
->
info
.
rows
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{
0
};
SColumnInfoData
colInfo
=
{
0
};
...
@@ -1217,7 +1217,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
...
@@ -1217,7 +1217,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
}
}
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
)
{
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
)
{
return
(
int32_t
)
((
pageSize
-
blockDataGetSerialMetaSize
(
pBlock
))
/
blockDataGetSerialRowSize
(
pBlock
));
return
(
int32_t
)
((
pageSize
-
blockDataGetSerialMetaSize
(
pBlock
))
/
blockDataGetSerialRowSize
(
pBlock
));
}
}
void
colDataDestroy
(
SColumnInfoData
*
pColData
)
{
void
colDataDestroy
(
SColumnInfoData
*
pColData
)
{
...
@@ -1234,14 +1234,14 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
...
@@ -1234,14 +1234,14 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
int32_t
len
=
BitmapLen
(
total
);
int32_t
len
=
BitmapLen
(
total
);
int32_t
newLen
=
BitmapLen
(
total
-
n
);
int32_t
newLen
=
BitmapLen
(
total
-
n
);
if
(
n
%
8
==
0
)
{
if
(
n
%
8
==
0
)
{
memmove
(
nullBitmap
,
nullBitmap
+
n
/
8
,
newLen
);
memmove
(
nullBitmap
,
nullBitmap
+
n
/
8
,
newLen
);
}
else
{
}
else
{
int32_t
tail
=
n
%
8
;
int32_t
tail
=
n
%
8
;
int32_t
i
=
0
;
int32_t
i
=
0
;
uint8_t
*
p
=
(
uint8_t
*
)
nullBitmap
;
uint8_t
*
p
=
(
uint8_t
*
)
nullBitmap
;
while
(
i
<
len
)
{
while
(
i
<
len
)
{
uint8_t
v
=
p
[
i
];
uint8_t
v
=
p
[
i
];
p
[
i
]
=
0
;
p
[
i
]
=
0
;
...
@@ -1268,7 +1268,7 @@ static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_
...
@@ -1268,7 +1268,7 @@ static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_
}
}
}
}
int32_t
blockDataTrimFirstNRows
(
SSDataBlock
*
pBlock
,
size_t
n
)
{
int32_t
blockDataTrimFirstNRows
(
SSDataBlock
*
pBlock
,
size_t
n
)
{
if
(
n
==
0
)
{
if
(
n
==
0
)
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -1276,7 +1276,7 @@ int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) {
...
@@ -1276,7 +1276,7 @@ int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) {
if
(
pBlock
->
info
.
rows
<=
n
)
{
if
(
pBlock
->
info
.
rows
<=
n
)
{
blockDataCleanup
(
pBlock
);
blockDataCleanup
(
pBlock
);
}
else
{
}
else
{
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
colDataTrimFirstNRows
(
pColInfoData
,
n
,
pBlock
->
info
.
rows
);
colDataTrimFirstNRows
(
pColInfoData
,
n
,
pBlock
->
info
.
rows
);
}
}
...
@@ -1462,3 +1462,67 @@ void blockDebugShowData(const SArray* dataBlocks) {
...
@@ -1462,3 +1462,67 @@ void blockDebugShowData(const SArray* dataBlocks) {
}
}
}
}
SSubmitReq
*
tdBlockToSubmit
(
const
SArray
*
pBlocks
,
const
STSchema
*
pTSchema
)
{
SSubmitReq
*
ret
=
NULL
;
// cal size
int32_t
cap
=
sizeof
(
SSubmitReq
);
int32_t
sz
=
taosArrayGetSize
(
pBlocks
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pBlocks
,
i
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
// TODO min
int32_t
rowSize
=
pDataBlock
->
info
.
rowSize
;
int32_t
maxLen
=
TD_ROW_MAX_BYTES_FROM_SCHEMA
(
pTSchema
);
cap
+=
sizeof
(
SSubmitBlk
)
+
rows
*
maxLen
;
}
// assign data
ret
=
taosMemoryCalloc
(
1
,
cap
);
ret
->
version
=
htonl
(
1
);
ret
->
length
=
htonl
(
cap
-
sizeof
(
SSubmitReq
));
ret
->
numOfBlocks
=
htonl
(
sz
);
void
*
submitBlk
=
POINTER_SHIFT
(
ret
,
sizeof
(
SSubmitReq
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pBlocks
,
i
);
SSubmitBlk
*
blkHead
=
submitBlk
;
blkHead
->
numOfRows
=
htons
(
pDataBlock
->
info
.
rows
);
blkHead
->
schemaLen
=
0
;
blkHead
->
sversion
=
htonl
(
pTSchema
->
version
);
// TODO
blkHead
->
suid
=
0
;
blkHead
->
uid
=
htobe64
(
pDataBlock
->
info
.
uid
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
maxLen
=
TD_ROW_MAX_BYTES_FROM_SCHEMA
(
pTSchema
);
/*blkHead->dataLen = htonl(rows * maxLen);*/
blkHead
->
dataLen
=
0
;
void
*
blockData
=
POINTER_SHIFT
(
submitBlk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
blockData
;
for
(
int32_t
j
=
0
;
j
<
pDataBlock
->
info
.
rows
;
j
++
)
{
SRowBuilder
rb
=
{
0
};
tdSRowInit
(
&
rb
,
pTSchema
->
version
);
tdSRowSetTpInfo
(
&
rb
,
pTSchema
->
numOfCols
,
pTSchema
->
flen
);
tdSRowResetBuf
(
&
rb
,
rowData
);
for
(
int32_t
k
=
0
;
k
<
pTSchema
->
numOfCols
;
k
++
)
{
const
STColumn
*
pColumn
=
&
pTSchema
->
columns
[
k
];
SColumnInfoData
*
pColData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
data
=
colDataGetData
(
pColData
,
j
);
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
pColumn
->
offset
,
k
);
}
int32_t
rowLen
=
TD_ROW_LEN
(
rowData
);
rowData
=
POINTER_SHIFT
(
rowData
,
rowLen
);
blkHead
->
dataLen
+=
rowLen
;
}
int32_t
len
=
blkHead
->
dataLen
;
blkHead
->
dataLen
=
htonl
(
len
);
blkHead
=
POINTER_SHIFT
(
blkHead
,
len
);
}
return
ret
;
}
source/common/src/tdataformat.c
浏览文件 @
a5706ea7
...
@@ -16,6 +16,7 @@
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "tdataformat.h"
#include "tdataformat.h"
#include "tcoding.h"
#include "tcoding.h"
#include "tdatablock.h"
#include "tlog.h"
#include "tlog.h"
static
void
dataColSetNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
static
void
dataColSetNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
...
@@ -128,6 +129,50 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
...
@@ -128,6 +129,50 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
return
buf
;
return
buf
;
}
}
#if 0
int32_t tEncodeSTColumn(SCoder *pEncoder, const STColumn *pCol) {
if (tEncodeI16(pEncoder, pCol->colId) < 0) return -1;
if (tEncodeI8(pEncoder, pCol->type) < 0) return -1;
if (tEncodeI8(pEncoder, pCol->sma) < 0) return -1;
if (tEncodeI32(pEncoder, pCol->bytes) < 0) return -1;
if (tEncodeI32(pEncoder, pCol->offset) < 0) return -1;
return pEncoder->pos;
}
int32_t tDecodeSTColumn(SCoder *pDecoder, STColumn *pCol) {
if (tDecodeI16(pDecoder, &pCol->colId) < 0) return -1;
if (tDecodeI8(pDecoder, &pCol->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pCol->sma) < 0) return -1;
if (tDecodeI32(pDecoder, &pCol->bytes) < 0) return -1;
if (tDecodeI32(pDecoder, &pCol->offset) < 0) return -1;
return 0;
}
int32_t tEncodeSchema(SCoder *pEncoder, const STSchema *pSchema) {
if (tEncodeI32(pEncoder, pSchema->numOfCols) < 0) return -1;
if (tEncodeI16(pEncoder, pSchema->version) < 0) return -1;
if (tEncodeU16(pEncoder, pSchema->flen) < 0) return -1;
if (tEncodeI32(pEncoder, pSchema->vlen) < 0) return -1;
if (tEncodeI32(pEncoder, pSchema->tlen) < 0) return -1;
for (int32_t i = 0; i < schemaNCols(pSchema); i++) {
const STColumn *pCol = schemaColAt(pSchema, i);
if (tEncodeSTColumn(pEncoder, pCol) < 0) return -1;
}
return 0;
}
int32_t tDecodeSchema(SCoder *pDecoder, STSchema *pSchema) {
if (tDecodeI32(pDecoder, &pSchema->numOfCols) < 0) return -1;
if (tDecodeI16(pDecoder, &pSchema->version) < 0) return -1;
if (tDecodeU16(pDecoder, &pSchema->flen) < 0) return -1;
if (tDecodeI32(pDecoder, &pSchema->vlen) < 0) return -1;
if (tDecodeI32(pDecoder, &pSchema->tlen) < 0) return -1;
return 0;
}
#endif
int
tdInitTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
,
schema_ver_t
version
)
{
int
tdInitTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
,
schema_ver_t
version
)
{
if
(
pBuilder
==
NULL
)
return
-
1
;
if
(
pBuilder
==
NULL
)
return
-
1
;
...
@@ -908,4 +953,4 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
...
@@ -908,4 +953,4 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
taosArrayDestroy(stashRow);
taosArrayDestroy(stashRow);
return buffer;
return buffer;
}
}
#endif
#endif
\ No newline at end of file
source/common/src/tmsg.c
浏览文件 @
a5706ea7
...
@@ -174,6 +174,25 @@ STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) {
...
@@ -174,6 +174,25 @@ STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) {
}
}
}
}
int32_t
tPrintFixedSchemaSubmitReq
(
const
SSubmitReq
*
pReq
,
STSchema
*
pTschema
)
{
SSubmitMsgIter
msgIter
=
{
0
};
if
(
tInitSubmitMsgIterEx
(
pReq
,
&
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
SSubmitBlk
*
pBlock
=
NULL
;
if
(
tGetSubmitMsgNextEx
(
&
msgIter
,
&
pBlock
)
<
0
)
return
-
1
;
if
(
pBlock
==
NULL
)
break
;
SSubmitBlkIter
blkIter
=
{
0
};
tInitSubmitBlkIterEx
(
&
msgIter
,
pBlock
,
&
blkIter
);
STSRowIter
rowIter
=
{
0
};
tdSTSRowIterInit
(
&
rowIter
,
pTschema
);
STSRow
*
row
;
while
((
row
=
tGetSubmitBlkNextEx
(
&
blkIter
))
!=
NULL
)
{
tdSRowPrint
(
row
,
pTschema
,
"stream"
);
}
}
return
0
;
}
int32_t
tEncodeSEpSet
(
SCoder
*
pEncoder
,
const
SEpSet
*
pEp
)
{
int32_t
tEncodeSEpSet
(
SCoder
*
pEncoder
,
const
SEpSet
*
pEp
)
{
if
(
tEncodeI8
(
pEncoder
,
pEp
->
inUse
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pEp
->
inUse
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pEp
->
numOfEps
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pEp
->
numOfEps
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
a5706ea7
...
@@ -415,6 +415,7 @@ typedef struct {
...
@@ -415,6 +415,7 @@ typedef struct {
typedef
struct
{
typedef
struct
{
char
key
[
TSDB_PARTITION_KEY_LEN
];
char
key
[
TSDB_PARTITION_KEY_LEN
];
int64_t
dbUid
;
int64_t
offset
;
int64_t
offset
;
}
SMqOffsetObj
;
}
SMqOffsetObj
;
...
...
source/dnode/mnode/impl/inc/mndOffset.h
浏览文件 @
a5706ea7
...
@@ -37,6 +37,8 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
...
@@ -37,6 +37,8 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
return
snprintf
(
key
,
TSDB_PARTITION_KEY_LEN
,
"%d:%s:%s"
,
vgId
,
cgroup
,
topicName
);
return
snprintf
(
key
,
TSDB_PARTITION_KEY_LEN
,
"%d:%s:%s"
,
vgId
,
cgroup
,
topicName
);
}
}
int32_t
mndDropOffsetByDB
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/mnode/impl/inc/mndSubscribe.h
浏览文件 @
a5706ea7
...
@@ -31,6 +31,8 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
...
@@ -31,6 +31,8 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
int32_t
mndMakeSubscribeKey
(
char
*
key
,
const
char
*
cgroup
,
const
char
*
topicName
);
int32_t
mndMakeSubscribeKey
(
char
*
key
,
const
char
*
cgroup
,
const
char
*
topicName
);
int32_t
mndDropSubByDB
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
a5706ea7
...
@@ -17,9 +17,12 @@
...
@@ -17,9 +17,12 @@
#include "mndDb.h"
#include "mndDb.h"
#include "mndAuth.h"
#include "mndAuth.h"
#include "mndDnode.h"
#include "mndDnode.h"
#include "mndOffset.h"
#include "mndShow.h"
#include "mndShow.h"
#include "mndSma.h"
#include "mndSma.h"
#include "mndStb.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndVgroup.h"
...
@@ -1027,6 +1030,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
...
@@ -1027,6 +1030,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
if
(
mndSetDropDbRedoLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropDbRedoLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropDbCommitLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropDbCommitLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
goto
_OVER
;
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
if
(
mndSetDropDbRedoActions
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropDbRedoActions
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
goto
_OVER
;
int32_t
rspLen
=
0
;
int32_t
rspLen
=
0
;
...
@@ -1387,7 +1393,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
...
@@ -1387,7 +1393,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
bool
sysDb
)
{
bool
sysDb
)
{
int32_t
cols
=
0
;
int32_t
cols
=
0
;
int32_t
bytes
=
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
;
int32_t
bytes
=
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
;
char
*
buf
=
taosMemoryMalloc
(
bytes
);
char
*
buf
=
taosMemoryMalloc
(
bytes
);
const
char
*
name
=
mndGetDbStr
(
pDb
->
name
);
const
char
*
name
=
mndGetDbStr
(
pDb
->
name
);
if
(
name
!=
NULL
)
{
if
(
name
!=
NULL
)
{
...
...
source/dnode/mnode/impl/src/mndOffset.c
浏览文件 @
a5706ea7
...
@@ -231,3 +231,36 @@ static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
...
@@ -231,3 +231,36 @@ static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
sdbCancelFetch
(
pSdb
,
pIter
);
}
}
static
int32_t
mndSetDropOffsetCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqOffsetObj
*
pOffset
)
{
SSdbRaw
*
pCommitRaw
=
mndOffsetActionEncode
(
pOffset
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
int32_t
mndDropOffsetByDB
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
int32_t
code
=
-
1
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
SMqOffsetObj
*
pOffset
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
pIter
,
(
void
**
)
&
pOffset
);
if
(
pIter
==
NULL
)
break
;
if
(
pOffset
->
dbUid
!=
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pOffset
);
continue
;
}
if
(
mndSetDropOffsetCommitLogs
(
pMnode
,
pTrans
,
pOffset
)
<
0
)
{
goto
END
;
}
}
code
=
0
;
END:
return
code
;
}
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
a5706ea7
...
@@ -204,6 +204,8 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
...
@@ -204,6 +204,8 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
pTask
->
smaSink
.
smaId
=
pStream
->
smaId
;
pTask
->
smaSink
.
smaId
=
pStream
->
smaId
;
}
else
{
}
else
{
pTask
->
sinkType
=
TASK_SINK__TABLE
;
pTask
->
sinkType
=
TASK_SINK__TABLE
;
pTask
->
tbSink
.
pSchemaWrapper
=
tCloneSSchemaWrapper
(
&
pStream
->
outputSchema
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
}
}
// dispatch
// dispatch
...
@@ -242,6 +244,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
...
@@ -242,6 +244,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
pTask
->
smaSink
.
smaId
=
pStream
->
smaId
;
pTask
->
smaSink
.
smaId
=
pStream
->
smaId
;
}
else
{
}
else
{
pTask
->
sinkType
=
TASK_SINK__TABLE
;
pTask
->
sinkType
=
TASK_SINK__TABLE
;
pTask
->
tbSink
.
pSchemaWrapper
=
tCloneSSchemaWrapper
(
&
pStream
->
outputSchema
);
}
}
//
//
// dispatch
// dispatch
...
@@ -316,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
...
@@ -316,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask
->
smaSink
.
smaId
=
pStream
->
smaId
;
pTask
->
smaSink
.
smaId
=
pStream
->
smaId
;
}
else
{
}
else
{
pTask
->
sinkType
=
TASK_SINK__TABLE
;
pTask
->
sinkType
=
TASK_SINK__TABLE
;
pTask
->
tbSink
.
pSchemaWrapper
=
tCloneSSchemaWrapper
(
&
pStream
->
outputSchema
);
}
}
#endif
#endif
}
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a5706ea7
...
@@ -926,6 +926,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
...
@@ -926,6 +926,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
pTask
->
ahandle
=
pTq
->
pVnode
;
pTask
->
ahandle
=
pTq
->
pVnode
;
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaHandle
=
smaHandleRes
;
pTask
->
smaSink
.
smaHandle
=
smaHandleRes
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
);
pTask
->
tbSink
.
pTSchema
=
tdGetSTSChemaFromSSChema
(
&
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
,
pTask
->
tbSink
.
pSchemaWrapper
->
nCols
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
}
}
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
...
...
source/libs/stream/src/tstream.c
浏览文件 @
a5706ea7
...
@@ -152,8 +152,10 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
...
@@ -152,8 +152,10 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink
// sink
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
//
/*blockDebugShowData(pRes);*/
blockDebugShowData
(
pRes
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
SSubmitReq
*
pReq
=
tdBlockToSubmit
(
pRes
,
pTask
->
tbSink
.
pTSchema
);
tPrintFixedSchemaSubmitReq
(
pReq
,
pTask
->
tbSink
.
pTSchema
);
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaHandle
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
pTask
->
smaSink
.
smaHandle
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
//
//
...
@@ -274,7 +276,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
...
@@ -274,7 +276,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
}
}
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/
if
(
tEncodeSSchemaWrapper
(
pEncoder
,
pTask
->
tbSink
.
pSchemaWrapper
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tEncodeI64
(
pEncoder
,
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
...
@@ -318,7 +321,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
...
@@ -318,7 +321,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
}
}
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/
pTask
->
tbSink
.
pSchemaWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pTask
->
tbSink
.
pSchemaWrapper
==
NULL
)
return
-
1
;
if
(
tDecodeSSchemaWrapper
(
pDecoder
,
pTask
->
tbSink
.
pSchemaWrapper
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录