Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1c40afc6
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1c40afc6
编写于
3月 17, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add tmq_get_row
上级
7f4c88b7
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
252 addition
and
39 deletion
+252
-39
include/client/taos.h
include/client/taos.h
+28
-22
include/common/tdatablock.h
include/common/tdatablock.h
+19
-4
include/common/tmsg.h
include/common/tmsg.h
+25
-2
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
source/client/src/tmq.c
source/client/src/tmq.c
+34
-2
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+34
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+78
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+6
-6
source/dnode/vnode/inc/tq.h
source/dnode/vnode/inc/tq.h
+1
-0
source/dnode/vnode/src/inc/tqInt.h
source/dnode/vnode/src/inc/tqInt.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+17
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+8
-3
未找到文件。
include/client/taos.h
浏览文件 @
1c40afc6
...
@@ -31,27 +31,27 @@ typedef void TAOS_SUB;
...
@@ -31,27 +31,27 @@ typedef void TAOS_SUB;
typedef
void
**
TAOS_ROW
;
typedef
void
**
TAOS_ROW
;
// Data type definition
// Data type definition
#define TSDB_DATA_TYPE_NULL 0
// 1 bytes
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1
// 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2
// 1 byte
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3
// 2 bytes
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4
// 4 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5
// 8 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6
// 4 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7
// 8 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 8
// string, alias for varchar
#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9
// 8 bytes
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10
// unicode string
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11
// 1 byte
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12
// 2 bytes
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13
// 4 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14
// 8 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15
// json string
#define TSDB_DATA_TYPE_JSON 15 // json string
#define TSDB_DATA_TYPE_VARBINARY 16
// binary
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_DECIMAL 17
// decimal
#define TSDB_DATA_TYPE_DECIMAL 17 // decimal
#define TSDB_DATA_TYPE_BLOB 18
// binary
#define TSDB_DATA_TYPE_BLOB 18 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR
// string
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
typedef
enum
{
typedef
enum
{
TSDB_OPTION_LOCALE
,
TSDB_OPTION_LOCALE
,
...
@@ -257,9 +257,15 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
...
@@ -257,9 +257,15 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
void
tmqShowMsg
(
tmq_message_t
*
tmq_message
);
void
tmqShowMsg
(
tmq_message_t
*
tmq_message
);
int32_t
tmqGetSkipLogNum
(
tmq_message_t
*
tmq_message
);
int32_t
tmqGetSkipLogNum
(
tmq_message_t
*
tmq_message
);
typedef
void
(
*
TAOS_SUBSCRIBE_CALLBACK
)(
TAOS_SUB
*
tsub
,
TAOS_RES
*
res
,
void
*
param
,
int
code
);
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
);
DLL_EXPORT
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
);
/* ---------------------- OTHER ---------------------------- */
typedef
void
(
*
TAOS_SUBSCRIBE_CALLBACK
)(
TAOS_SUB
*
tsub
,
TAOS_RES
*
res
,
void
*
param
,
int
code
);
DLL_EXPORT
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
include/common/tdatablock.h
浏览文件 @
1c40afc6
...
@@ -52,6 +52,21 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
...
@@ -52,6 +52,21 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
} while (0)
static
FORCE_INLINE
bool
colDataIsNull_s
(
const
SColumnInfoData
*
pColumnInfoData
,
uint32_t
row
)
{
if
(
!
pColumnInfoData
->
hasNull
)
{
return
false
;
}
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
return
pColumnInfoData
->
varmeta
.
offset
[
row
]
==
-
1
;
}
else
{
if
(
pColumnInfoData
->
nullbitmap
==
NULL
)
{
return
false
;
}
return
colDataIsNull_f
(
pColumnInfoData
->
nullbitmap
,
row
);
}
}
static
FORCE_INLINE
bool
colDataIsNull
(
const
SColumnInfoData
*
pColumnInfoData
,
uint32_t
totalRows
,
uint32_t
row
,
static
FORCE_INLINE
bool
colDataIsNull
(
const
SColumnInfoData
*
pColumnInfoData
,
uint32_t
totalRows
,
uint32_t
row
,
SColumnDataAgg
*
pColAgg
)
{
SColumnDataAgg
*
pColAgg
)
{
if
(
!
pColumnInfoData
->
hasNull
)
{
if
(
!
pColumnInfoData
->
hasNull
)
{
...
@@ -79,10 +94,10 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
...
@@ -79,10 +94,10 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
}
}
}
}
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
#define colDataGetData(p1_, r_) \
// SColumnInfoData, rowNumber
#define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
...
@@ -126,4 +141,4 @@ void* blockDataDestroy(SSDataBlock* pBlock);
...
@@ -126,4 +141,4 @@ void* blockDataDestroy(SSDataBlock* pBlock);
}
}
#endif
#endif
#endif
/*_TD_COMMON_EP_H_*/
#endif
/*_TD_COMMON_EP_H_*/
include/common/tmsg.h
浏览文件 @
1c40afc6
...
@@ -419,7 +419,7 @@ typedef struct {
...
@@ -419,7 +419,7 @@ typedef struct {
};
};
}
SColumnFilterList
;
}
SColumnFilterList
;
/*
/*
* for client side struct,
we only need the column id, type, bytes are not
necessary
* for client side struct,
only column id, type, bytes are
necessary
* But for data in vnode side, we need all the following information.
* But for data in vnode side, we need all the following information.
*/
*/
typedef
struct
{
typedef
struct
{
...
@@ -2173,13 +2173,36 @@ typedef struct {
...
@@ -2173,13 +2173,36 @@ typedef struct {
SArray
*
topics
;
// SArray<SMqSubTopicEp>
SArray
*
topics
;
// SArray<SMqSubTopicEp>
}
SMqCMGetSubEpRsp
;
}
SMqCMGetSubEpRsp
;
struct
tmq_message_
t
{
typedef
struc
t
{
SMqRspHead
head
;
SMqRspHead
head
;
union
{
union
{
SMqPollRsp
consumeRsp
;
SMqPollRsp
consumeRsp
;
SMqCMGetSubEpRsp
getEpRsp
;
SMqCMGetSubEpRsp
getEpRsp
;
};
};
void
*
extra
;
void
*
extra
;
}
SMqMsgWrapper
;
typedef
struct
{
int32_t
curBlock
;
int32_t
curRow
;
void
**
uData
;
}
SMqRowIter
;
struct
tmq_message_t_v1
{
SMqPollRsp
rsp
;
SMqRowIter
iter
;
};
struct
tmq_message_t
{
SMqRspHead
head
;
union
{
SMqPollRsp
consumeRsp
;
SMqCMGetSubEpRsp
getEpRsp
;
};
void
*
extra
;
int32_t
curBlock
;
int32_t
curRow
;
void
**
uData
;
};
};
static
FORCE_INLINE
void
tDeleteSMqSubTopicEp
(
SMqSubTopicEp
*
pSubTopicEp
)
{
taosArrayDestroy
(
pSubTopicEp
->
vgs
);
}
static
FORCE_INLINE
void
tDeleteSMqSubTopicEp
(
SMqSubTopicEp
*
pSubTopicEp
)
{
taosArrayDestroy
(
pSubTopicEp
->
vgs
);
}
...
...
include/common/tmsgdef.h
浏览文件 @
1c40afc6
...
@@ -189,6 +189,7 @@ enum {
...
@@ -189,6 +189,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_DEPLOY
,
"vnode-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_SMA
,
"vnode-create-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_SMA
,
"vnode-create-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_SMA
,
"vnode-cancel-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_SMA
,
"vnode-cancel-sma"
,
NULL
,
NULL
)
...
...
source/client/src/tmq.c
浏览文件 @
1c40afc6
...
@@ -700,6 +700,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
...
@@ -700,6 +700,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
}
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqPollRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRsp
->
consumeRsp
);
tDecodeSMqPollRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRsp
->
consumeRsp
);
pRsp
->
curBlock
=
0
;
pRsp
->
curRow
=
0
;
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if
(
pRsp
->
consumeRsp
.
numOfTopics
==
0
)
{
if
(
pRsp
->
consumeRsp
.
numOfTopics
==
0
)
{
/*printf("no data\n");*/
/*printf("no data\n");*/
...
@@ -758,9 +762,9 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
...
@@ -758,9 +762,9 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto
END
;
goto
END
;
}
}
// tmq's epoch is monoto
m
ically increase,
// tmq's epoch is monoto
n
ically increase,
// so it's safe to discard any old epoch msg.
// so it's safe to discard any old epoch msg.
//
e
poch will only increase when received newer epoch ep msg
//
E
poch will only increase when received newer epoch ep msg
SMqRspHead
*
head
=
pMsg
->
pData
;
SMqRspHead
*
head
=
pMsg
->
pData
;
int32_t
epoch
=
atomic_load_32
(
&
tmq
->
epoch
);
int32_t
epoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
head
->
epoch
<=
epoch
)
{
if
(
head
->
epoch
<=
epoch
)
{
...
@@ -1282,6 +1286,34 @@ const char* tmq_err2str(tmq_resp_err_t err) {
...
@@ -1282,6 +1286,34 @@ const char* tmq_err2str(tmq_resp_err_t err) {
return
"fail"
;
return
"fail"
;
}
}
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
)
{
SMqPollRsp
*
rsp
=
&
message
->
consumeRsp
;
while
(
1
)
{
if
(
message
->
curBlock
<
taosArrayGetSize
(
rsp
->
pBlockData
))
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
rsp
->
pBlockData
,
message
->
curBlock
);
if
(
message
->
curRow
<
pBlock
->
info
.
rows
)
{
for
(
int
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
i
++
)
{
SColumnInfoData
*
pData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
if
(
colDataIsNull_s
(
pData
,
message
->
curRow
))
message
->
uData
[
i
]
=
NULL
;
else
{
message
->
uData
[
i
]
=
colDataGetData
(
pData
,
message
->
curRow
);
}
}
message
->
curRow
++
;
return
message
->
uData
;
}
else
{
message
->
curBlock
++
;
message
->
curRow
=
0
;
continue
;
}
}
return
NULL
;
}
}
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
)
{
return
"not implemented yet"
;
}
#if 0
#if 0
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* pTmq = malloc(sizeof(tmq_t));
tmq_t* pTmq = malloc(sizeof(tmq_t));
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
1c40afc6
...
@@ -27,6 +27,22 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
...
@@ -27,6 +27,22 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sql
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sql
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
logicalPlan
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
logicalPlan
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
// TODO encode tasks
if
(
pObj
->
tasks
)
{
int32_t
sz
=
taosArrayGetSize
(
pObj
->
tasks
);
tEncodeI32
(
pEncoder
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pArray
=
taosArrayGet
(
pObj
->
tasks
,
i
);
int32_t
innerSz
=
taosArrayGetSize
(
pArray
);
tEncodeI32
(
pEncoder
,
innerSz
);
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGet
(
pArray
,
j
);
tEncodeSStreamTask
(
pEncoder
,
pTask
);
}
}
}
else
{
tEncodeI32
(
pEncoder
,
0
);
}
return
pEncoder
->
pos
;
return
pEncoder
->
pos
;
}
}
...
@@ -42,5 +58,23 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
...
@@ -42,5 +58,23 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
sql
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
sql
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
logicalPlan
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
logicalPlan
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
int32_t
sz
;
if
(
tDecodeI32
(
pDecoder
,
&
sz
)
<
0
)
return
-
1
;
if
(
sz
!=
0
)
{
pObj
->
tasks
=
taosArrayInit
(
sz
,
sizeof
(
SArray
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
innerSz
;
if
(
tDecodeI32
(
pDecoder
,
&
innerSz
)
<
0
)
return
-
1
;
SArray
*
pArray
=
taosArrayInit
(
innerSz
,
sizeof
(
SStreamTask
));
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
task
;
if
(
tDecodeSStreamTask
(
pDecoder
,
&
task
)
<
0
)
return
-
1
;
taosArrayPush
(
pArray
,
&
task
);
}
taosArrayPush
(
pObj
->
tasks
,
pArray
);
}
}
else
{
pObj
->
tasks
=
NULL
;
}
return
0
;
return
0
;
}
}
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
1c40afc6
...
@@ -77,6 +77,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
...
@@ -77,6 +77,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
return
-
1
;
return
-
1
;
}
}
taosArrayPush
(
taskOneLevel
,
pTask
);
taosArrayPush
(
taskOneLevel
,
pTask
);
SCoder
encoder
;
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
NULL
,
0
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
encoder
.
pos
;
tCoderClear
(
&
encoder
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
streamTaskId
=
pTask
->
taskId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
abuf
,
tlen
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
tCoderClear
(
&
encoder
);
STransAction
action
=
{
0
};
action
.
epSet
=
plan
->
execNode
.
epSet
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_TASK_DEPLOY
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
return
-
1
;
}
}
}
}
else
if
(
plan
->
subplanType
==
SUBPLAN_TYPE_SCAN
)
{
}
else
if
(
plan
->
subplanType
==
SUBPLAN_TYPE_SCAN
)
{
// duplicatable
// duplicatable
...
@@ -101,6 +127,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
...
@@ -101,6 +127,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
}
}
taosArrayPush
(
taskOneLevel
,
pTask
);
taosArrayPush
(
taskOneLevel
,
pTask
);
SCoder
encoder
;
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
NULL
,
0
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
encoder
.
pos
;
tCoderClear
(
&
encoder
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
streamTaskId
=
pTask
->
taskId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
abuf
,
tlen
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
tCoderClear
(
&
encoder
);
STransAction
action
=
{
0
};
action
.
epSet
=
plan
->
execNode
.
epSet
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_SND_TASK_DEPLOY
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
return
-
1
;
}
}
}
}
else
{
}
else
{
// not duplicatable
// not duplicatable
...
@@ -117,6 +169,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
...
@@ -117,6 +169,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
return
-
1
;
return
-
1
;
}
}
taosArrayPush
(
taskOneLevel
,
pTask
);
taosArrayPush
(
taskOneLevel
,
pTask
);
SCoder
encoder
;
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
NULL
,
0
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
encoder
.
pos
;
tCoderClear
(
&
encoder
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
((
SMsgHead
*
)
buf
)
->
streamTaskId
=
pTask
->
taskId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
abuf
,
tlen
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
tCoderClear
(
&
encoder
);
STransAction
action
=
{
0
};
action
.
epSet
=
plan
->
execNode
.
epSet
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_SND_TASK_DEPLOY
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
return
-
1
;
}
}
}
taosArrayPush
(
pStream
->
tasks
,
taskOneLevel
);
taosArrayPush
(
pStream
->
tasks
,
taskOneLevel
);
}
}
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
1c40afc6
...
@@ -230,19 +230,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR
...
@@ -230,19 +230,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR
}
}
mDebug
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
pCreate
->
name
);
mDebug
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
pCreate
->
name
);
SSdbRaw
*
pRedoRaw
=
mndStreamActionEncode
(
&
streamObj
);
if
(
mndScheduleStream
(
pMnode
,
pTrans
,
&
streamObj
)
<
0
)
{
if
(
pRedoRaw
==
NULL
||
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
{
mError
(
"stream:%ld, schedule stream since %s"
,
streamObj
.
uid
,
terrstr
());
mError
(
"trans:%d, failed to append redo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
if
(
mndScheduleStream
(
pMnode
,
pTrans
,
&
streamObj
)
<
0
)
{
SSdbRaw
*
pRedoRaw
=
mndStreamActionEncode
(
&
streamObj
);
mError
(
"stream:%ld, schedule stream since %s"
,
streamObj
.
uid
,
terrstr
());
if
(
pRedoRaw
==
NULL
||
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append redo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
...
...
source/dnode/vnode/inc/tq.h
浏览文件 @
1c40afc6
...
@@ -55,6 +55,7 @@ int tqCommit(STQ*);
...
@@ -55,6 +55,7 @@ int tqCommit(STQ*);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/vnode/src/inc/tqInt.h
浏览文件 @
1c40afc6
...
@@ -161,6 +161,7 @@ struct STQ {
...
@@ -161,6 +161,7 @@ struct STQ {
STqMemRef
tqMemRef
;
STqMemRef
tqMemRef
;
STqMetaStore
*
tqMeta
;
STqMetaStore
*
tqMeta
;
STqPushMgr
*
tqPushMgr
;
STqPushMgr
*
tqPushMgr
;
SHashObj
*
pStreamTasks
;
SWal
*
pWal
;
SWal
*
pWal
;
SMeta
*
pVnodeMeta
;
SMeta
*
pVnodeMeta
;
};
};
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
1c40afc6
...
@@ -55,6 +55,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S
...
@@ -55,6 +55,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S
return
NULL
;
return
NULL
;
}
}
pTq
->
pStreamTasks
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
return
pTq
;
return
pTq
;
}
}
...
@@ -416,3 +418,18 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
...
@@ -416,3 +418,18 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
terrno
=
TSDB_CODE_SUCCESS
;
terrno
=
TSDB_CODE_SUCCESS
;
return
0
;
return
0
;
}
}
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTask
*
pTask
=
malloc
(
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
-
1
;
}
SCoder
decoder
;
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
(
uint8_t
*
)
msg
,
msgLen
,
TD_DECODER
);
tDecodeSStreamTask
(
&
decoder
,
pTask
);
tCoderClear
(
&
decoder
);
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
return
0
;
}
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
1c40afc6
...
@@ -41,7 +41,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
...
@@ -41,7 +41,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
return
0
;
return
0
;
}
}
int
vnodeApplyWMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int
vnodeApplyWMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
void
*
ptr
=
NULL
;
void
*
ptr
=
NULL
;
if
(
pVnode
->
config
.
streamMode
==
0
)
{
if
(
pVnode
->
config
.
streamMode
==
0
)
{
...
@@ -63,7 +63,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -63,7 +63,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
switch
(
pMsg
->
msgType
)
{
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_CREATE_STB
:
{
case
TDMT_VND_CREATE_STB
:
{
SVCreateTbReq
vCreateTbReq
=
{
0
};
SVCreateTbReq
vCreateTbReq
=
{
0
};
tDeserializeSVCreateTbReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vCreateTbReq
);
tDeserializeSVCreateTbReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vCreateTbReq
);
if
(
metaCreateTable
(
pVnode
->
pMeta
,
&
(
vCreateTbReq
))
<
0
)
{
if
(
metaCreateTable
(
pVnode
->
pMeta
,
&
(
vCreateTbReq
))
<
0
)
{
// TODO: handle error
// TODO: handle error
...
@@ -100,7 +100,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -100,7 +100,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
break
;
break
;
}
}
case
TDMT_VND_ALTER_STB
:
{
case
TDMT_VND_ALTER_STB
:
{
SVCreateTbReq
vAlterTbReq
=
{
0
};
SVCreateTbReq
vAlterTbReq
=
{
0
};
vTrace
(
"vgId:%d, process alter stb req"
,
pVnode
->
vgId
);
vTrace
(
"vgId:%d, process alter stb req"
,
pVnode
->
vgId
);
tDeserializeSVCreateTbReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vAlterTbReq
);
tDeserializeSVCreateTbReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vAlterTbReq
);
free
(
vAlterTbReq
.
stbCfg
.
pSchema
);
free
(
vAlterTbReq
.
stbCfg
.
pSchema
);
...
@@ -132,6 +132,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -132,6 +132,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
}
break
;
}
break
;
case
TDMT_VND_TASK_DEPLOY
:
{
if
(
tqProcessTaskDeploy
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
SSmaCfg
vCreateSmaReq
=
{
0
};
SSmaCfg
vCreateSmaReq
=
{
0
};
if
(
tDeserializeSVCreateTSmaReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vCreateSmaReq
)
==
NULL
)
{
if
(
tDeserializeSVCreateTSmaReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vCreateSmaReq
)
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录