Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
61e71159
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
61e71159
编写于
4月 07, 2022
作者:
C
cpwu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into cpwu/3.0
上级
30def824
628096b3
变更
13
显示空白变更内容
内联
并排
Showing
13 changed file
with
265 addition
and
249 deletion
+265
-249
include/client/taos.h
include/client/taos.h
+33
-30
include/common/tmsg.h
include/common/tmsg.h
+6
-0
source/client/src/clientMain.c
source/client/src/clientMain.c
+55
-4
source/client/src/tmq.c
source/client/src/tmq.c
+10
-7
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+3
-0
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+2
-0
source/dnode/vnode/src/inc/tqInt.h
source/dnode/vnode/src/inc/tqInt.h
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+53
-20
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+6
-19
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+26
-26
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+39
-136
source/os/src/osShm.c
source/os/src/osShm.c
+28
-3
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+3
-3
未找到文件。
include/client/taos.h
浏览文件 @
61e71159
...
@@ -188,6 +188,9 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
...
@@ -188,6 +188,9 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
);
DLL_EXPORT
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
);
DLL_EXPORT
bool
taos_is_update_query
(
TAOS_RES
*
res
);
DLL_EXPORT
bool
taos_is_update_query
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
);
DLL_EXPORT
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
);
DLL_EXPORT
int
taos_fetch_block_s
(
TAOS_RES
*
res
,
int
*
numOfRows
,
TAOS_ROW
*
rows
);
DLL_EXPORT
int
taos_fetch_raw_block
(
TAOS_RES
*
res
,
int
*
numOfRows
,
void
**
pData
);
DLL_EXPORT
int
*
taos_get_column_data_offset
(
TAOS_RES
*
res
,
int
columnIndex
);
DLL_EXPORT
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
);
DLL_EXPORT
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
);
DLL_EXPORT
void
taos_reset_current_db
(
TAOS
*
taos
);
DLL_EXPORT
void
taos_reset_current_db
(
TAOS
*
taos
);
...
...
include/common/tmsg.h
浏览文件 @
61e71159
...
@@ -1827,6 +1827,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
...
@@ -1827,6 +1827,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
typedef
struct
{
typedef
struct
{
int64_t
leftForVer
;
int64_t
leftForVer
;
int32_t
vgId
;
int32_t
vgId
;
int32_t
epoch
;
int64_t
consumerId
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_CGROUP_LEN
];
char
cgroup
[
TSDB_CGROUP_LEN
];
...
@@ -1840,6 +1841,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
...
@@ -1840,6 +1841,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
leftForVer
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
leftForVer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
epoch
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
topicName
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
topicName
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
cgroup
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
cgroup
);
...
@@ -1853,6 +1855,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
...
@@ -1853,6 +1855,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
static
FORCE_INLINE
void
*
tDecodeSMqSetCVgReq
(
void
*
buf
,
SMqSetCVgReq
*
pReq
)
{
static
FORCE_INLINE
void
*
tDecodeSMqSetCVgReq
(
void
*
buf
,
SMqSetCVgReq
*
pReq
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
leftForVer
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
leftForVer
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
topicName
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
topicName
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
cgroup
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
cgroup
);
...
@@ -1868,6 +1871,7 @@ typedef struct {
...
@@ -1868,6 +1871,7 @@ typedef struct {
int32_t
vgId
;
int32_t
vgId
;
int64_t
oldConsumerId
;
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
int64_t
newConsumerId
;
char
*
topic
;
}
SMqMVRebReq
;
}
SMqMVRebReq
;
static
FORCE_INLINE
int32_t
tEncodeSMqMVRebReq
(
void
**
buf
,
const
SMqMVRebReq
*
pReq
)
{
static
FORCE_INLINE
int32_t
tEncodeSMqMVRebReq
(
void
**
buf
,
const
SMqMVRebReq
*
pReq
)
{
...
@@ -1876,6 +1880,7 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR
...
@@ -1876,6 +1880,7 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
newConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
newConsumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
topic
);
return
tlen
;
return
tlen
;
}
}
...
@@ -1884,6 +1889,7 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
...
@@ -1884,6 +1889,7 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
newConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
newConsumerId
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
topic
);
return
buf
;
return
buf
;
}
}
...
...
source/client/src/clientMain.c
浏览文件 @
61e71159
...
@@ -385,11 +385,20 @@ bool taos_is_update_query(TAOS_RES *res) {
...
@@ -385,11 +385,20 @@ bool taos_is_update_query(TAOS_RES *res) {
}
}
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
)
{
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
)
{
if
(
res
==
NULL
)
{
int32_t
numOfRows
=
0
;
/*int32_t code = */
taos_fetch_block_s
(
res
,
&
numOfRows
,
rows
);
return
numOfRows
;
}
int
taos_fetch_block_s
(
TAOS_RES
*
res
,
int
*
numOfRows
,
TAOS_ROW
*
rows
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
return
0
;
return
0
;
}
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
(
*
rows
)
=
NULL
;
(
*
numOfRows
)
=
0
;
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
return
0
;
return
0
;
...
@@ -400,9 +409,51 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
...
@@ -400,9 +409,51 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
// TODO refactor
// TODO refactor
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
*
rows
=
pResultInfo
->
row
;
return
pResultInfo
->
numOfRows
;
(
*
rows
)
=
pResultInfo
->
row
;
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
return
pRequest
->
code
;
}
int
taos_fetch_raw_block
(
TAOS_RES
*
res
,
int
*
numOfRows
,
void
**
pData
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
return
0
;
}
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
return
0
;
}
doFetchRow
(
pRequest
,
false
);
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
(
*
pData
)
=
(
void
*
)
pResultInfo
->
pData
;
return
0
;
}
int
*
taos_get_column_data_offset
(
TAOS_RES
*
res
,
int
columnIndex
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
return
0
;
}
int32_t
numOfFields
=
taos_num_fields
(
pRequest
);
if
(
columnIndex
<
0
||
columnIndex
>=
numOfFields
||
numOfFields
==
0
)
{
return
0
;
}
TAOS_FIELD
*
pField
=
&
pRequest
->
body
.
resInfo
.
userFields
[
columnIndex
];
if
(
!
IS_VAR_DATA_TYPE
(
pField
->
type
))
{
return
0
;
}
return
pRequest
->
body
.
resInfo
.
pCol
[
columnIndex
].
offset
;
}
}
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
)
{
return
true
;
}
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
)
{
return
true
;
}
...
...
source/client/src/tmq.c
浏览文件 @
61e71159
...
@@ -108,7 +108,7 @@ typedef struct {
...
@@ -108,7 +108,7 @@ typedef struct {
// connection info
// connection info
int32_t
vgId
;
int32_t
vgId
;
int32_t
vgStatus
;
int32_t
vgStatus
;
int
64_t
s
kipCnt
;
int
32_t
vgS
kipCnt
;
SEpSet
epSet
;
SEpSet
epSet
;
}
SMqClientVg
;
}
SMqClientVg
;
...
@@ -849,7 +849,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
...
@@ -849,7 +849,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
msgEpoch
<
tmqEpoch
)
{
if
(
msgEpoch
<
tmqEpoch
)
{
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
/*tsem_post(&tmq->rspSem);*/
/*tsem_post(&tmq->rspSem);*/
tscWarn
(
"
discard rsp from vg %d,
epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
tscWarn
(
"
msg discard from vg %d since from earlier epoch, rsp
epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
return
0
;
return
0
;
}
}
...
@@ -881,6 +881,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
...
@@ -881,6 +881,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t
*
pRsp
=
taosAllocateQitem
(
sizeof
(
tmq_message_t
));
tmq_message_t
*
pRsp
=
taosAllocateQitem
(
sizeof
(
tmq_message_t
));
if
(
pRsp
==
NULL
)
{
if
(
pRsp
==
NULL
)
{
tscWarn
(
"msg discard from vg %d, epoch %d since out of memory"
,
pParam
->
vgId
,
pParam
->
epoch
);
goto
CREATE_MSG_FAIL
;
goto
CREATE_MSG_FAIL
;
}
}
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
...
@@ -969,14 +970,14 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
...
@@ -969,14 +970,14 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
offset
=
*
pOffset
;
offset
=
*
pOffset
;
tscDebug
(
"consumer %ld epoch %d vg %d found %s"
,
tmq
->
consumerId
,
epoch
,
pVgEp
->
vgId
,
vgKey
);
tscDebug
(
"consumer %ld epoch %d vg %d found %s"
,
tmq
->
consumerId
,
epoch
,
pVgEp
->
vgId
,
vgKey
);
}
}
tscDebug
(
"consumer %ld epoch %d vg %d offset set to %ld
\n
"
,
tmq
->
consumerId
,
epoch
,
pVgEp
->
vgId
,
offset
);
tscDebug
(
"consumer %ld epoch %d vg %d offset set to %ld"
,
tmq
->
consumerId
,
epoch
,
pVgEp
->
vgId
,
offset
);
SMqClientVg
clientVg
=
{
SMqClientVg
clientVg
=
{
.
pollCnt
=
0
,
.
pollCnt
=
0
,
.
currentOffset
=
offset
,
.
currentOffset
=
offset
,
.
vgId
=
pVgEp
->
vgId
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
,
.
epSet
=
pVgEp
->
epSet
,
.
vgStatus
=
TMQ_VG_STATUS__IDLE
,
.
vgStatus
=
TMQ_VG_STATUS__IDLE
,
.
s
kipCnt
=
0
,
.
vgS
kipCnt
=
0
,
};
};
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
set
=
true
;
...
@@ -1232,9 +1233,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
...
@@ -1232,9 +1233,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
int32_t
vgStatus
=
atomic_val_compare_exchange_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
,
TMQ_VG_STATUS__WAIT
);
int32_t
vgStatus
=
atomic_val_compare_exchange_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
,
TMQ_VG_STATUS__WAIT
);
if
(
vgStatus
!=
TMQ_VG_STATUS__IDLE
)
{
if
(
vgStatus
!=
TMQ_VG_STATUS__IDLE
)
{
int
64_t
skipCnt
=
atomic_add_fetch_64
(
&
pVg
->
s
kipCnt
,
1
);
int
32_t
vgSkipCnt
=
atomic_add_fetch_32
(
&
pVg
->
vgS
kipCnt
,
1
);
tscDebug
(
"consumer %ld epoch %d skip vg %d skip cnt %
ld"
,
tmq
->
consumerId
,
tmq
->
epoch
,
pVg
->
vgId
,
s
kipCnt
);
tscDebug
(
"consumer %ld epoch %d skip vg %d skip cnt %
d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
pVg
->
vgId
,
vgS
kipCnt
);
continue
;
continue
;
/*if (vgSkipCnt < 10000) continue;*/
#if 0
#if 0
if (skipCnt < 30000) {
if (skipCnt < 30000) {
continue;
continue;
...
@@ -1243,7 +1245,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
...
@@ -1243,7 +1245,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
}
}
#endif
#endif
}
}
atomic_store_
64
(
&
pVg
->
s
kipCnt
,
0
);
atomic_store_
32
(
&
pVg
->
vgS
kipCnt
,
0
);
SMqPollReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blockingTime
,
pTopic
,
pVg
);
SMqPollReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blockingTime
,
pTopic
,
pVg
);
if
(
pReq
==
NULL
)
{
if
(
pReq
==
NULL
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
...
@@ -1409,6 +1411,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
...
@@ -1409,6 +1411,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
if
(
blocking_time
!=
0
)
{
if
(
blocking_time
!=
0
)
{
int64_t
endTime
=
taosGetTimestampMs
();
int64_t
endTime
=
taosGetTimestampMs
();
if
(
endTime
-
startTime
>
blocking_time
)
{
if
(
endTime
-
startTime
>
blocking_time
)
{
tscDebug
(
"consumer %ld (epoch %d) timeout, no rsp"
,
tmq
->
consumerId
,
tmq
->
epoch
);
return
NULL
;
return
NULL
;
}
}
}
}
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
61e71159
...
@@ -413,6 +413,7 @@ typedef struct {
...
@@ -413,6 +413,7 @@ typedef struct {
typedef
struct
{
typedef
struct
{
int32_t
vgId
;
// -1 for unassigned
int32_t
vgId
;
// -1 for unassigned
int32_t
status
;
int32_t
status
;
int32_t
epoch
;
SEpSet
epSet
;
SEpSet
epSet
;
int64_t
oldConsumerId
;
int64_t
oldConsumerId
;
int64_t
consumerId
;
// -1 for unassigned
int64_t
consumerId
;
// -1 for unassigned
...
@@ -423,6 +424,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp
...
@@ -423,6 +424,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
vgId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
status
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
status
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
epoch
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
...
@@ -433,6 +435,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp
...
@@ -433,6 +435,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp
static
FORCE_INLINE
void
*
tDecodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
static
FORCE_INLINE
void
*
tDecodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
vgId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
status
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
status
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
epoch
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
61e71159
...
@@ -493,6 +493,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
...
@@ -493,6 +493,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
pConsumerEp
->
oldConsumerId
=
pConsumerEp
->
consumerId
;
pConsumerEp
->
oldConsumerId
=
pConsumerEp
->
consumerId
;
pConsumerEp
->
consumerId
=
pSubConsumer
->
consumerId
;
pConsumerEp
->
consumerId
=
pSubConsumer
->
consumerId
;
//TODO
pConsumerEp
->
epoch
=
0
;
taosArrayPush
(
pSubConsumer
->
vgInfo
,
pConsumerEp
);
taosArrayPush
(
pSubConsumer
->
vgInfo
,
pConsumerEp
);
if
(
pConsumerEp
->
oldConsumerId
==
-
1
)
{
if
(
pConsumerEp
->
oldConsumerId
==
-
1
)
{
...
...
source/dnode/vnode/src/inc/tqInt.h
浏览文件 @
61e71159
...
@@ -206,7 +206,7 @@ typedef struct {
...
@@ -206,7 +206,7 @@ typedef struct {
typedef
struct
{
typedef
struct
{
int64_t
consumerId
;
int64_t
consumerId
;
int
64
_t
epoch
;
int
32
_t
epoch
;
char
cgroup
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_TOPIC_FNAME_LEN
];
SArray
*
topics
;
// SArray<STqTopic>
SArray
*
topics
;
// SArray<STqTopic>
}
STqConsumer
;
}
STqConsumer
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
61e71159
...
@@ -167,7 +167,7 @@ static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pC
...
@@ -167,7 +167,7 @@ static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pC
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeFixedI
64
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeFixedI
32
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
...
@@ -182,7 +182,7 @@ static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer*
...
@@ -182,7 +182,7 @@ static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer*
int32_t
sz
;
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI
64
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeFixedI
32
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
STqTopic
));
pConsumer
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
STqTopic
));
...
@@ -255,6 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -255,6 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
fetchOffset
;
int64_t
fetchOffset
;
int64_t
blockingTime
=
pReq
->
blockingTime
;
int64_t
blockingTime
=
pReq
->
blockingTime
;
int32_t
reqEpoch
=
pReq
->
epoch
;
if
(
pReq
->
currentOffset
==
TMQ_CONF__RESET_OFFSET__EARLIEAST
)
{
if
(
pReq
->
currentOffset
==
TMQ_CONF__RESET_OFFSET__EARLIEAST
)
{
fetchOffset
=
0
;
fetchOffset
=
0
;
...
@@ -264,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -264,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset
=
pReq
->
currentOffset
+
1
;
fetchOffset
=
pReq
->
currentOffset
+
1
;
}
}
/*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/
vDebug
(
"tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
pReq
->
currentOffset
,
fetchOffset
);
SMqPollRsp
rsp
=
{
SMqPollRsp
rsp
=
{
/*.consumerId = consumerId,*/
/*.consumerId = consumerId,*/
...
@@ -274,6 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -274,6 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
if
(
pConsumer
==
NULL
)
{
vWarn
(
"tmq poll: consumer %ld (epoch %d) not found in vg %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
);
pMsg
->
pCont
=
NULL
;
pMsg
->
pCont
=
NULL
;
pMsg
->
contLen
=
0
;
pMsg
->
contLen
=
0
;
pMsg
->
code
=
-
1
;
pMsg
->
code
=
-
1
;
...
@@ -281,30 +283,57 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -281,30 +283,57 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return
0
;
return
0
;
}
}
int32_t
consumerEpoch
=
atomic_load_32
(
&
pConsumer
->
epoch
);
while
(
consumerEpoch
<
reqEpoch
)
{
consumerEpoch
=
atomic_val_compare_exchange_32
(
&
pConsumer
->
epoch
,
consumerEpoch
,
reqEpoch
);
}
STqTopic
*
pTopic
=
NULL
;
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
ASSERT
(
sz
==
1
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
*
pTopic
=
taosArrayGet
(
pConsumer
->
topics
,
0
);
STqTopic
*
topic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
ASSERT
(
strcmp
(
pTopic
->
topicName
,
pReq
->
topic
)
==
0
);
//TODO race condition
ASSERT
(
pConsumer
->
consumerId
==
consumerId
);
ASSERT
(
pConsumer
->
consumerId
==
consumerId
);
if
(
strcmp
(
topic
->
topicName
,
pReq
->
topic
)
==
0
)
{
pTopic
=
topic
;
break
;
}
}
if
(
pTopic
==
NULL
)
{
vWarn
(
"tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d"
,
consumerId
,
pReq
->
epoch
,
pReq
->
topic
,
pTq
->
pVnode
->
vgId
);
pMsg
->
pCont
=
NULL
;
pMsg
->
contLen
=
0
;
pMsg
->
code
=
-
1
;
tmsgSendRsp
(
pMsg
);
return
0
;
}
vDebug
(
"poll topic %s from consumer %ld (epoch %d)"
,
pTopic
->
topicName
,
consumerId
,
pReq
->
epoch
);
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
skipLogNum
=
0
;
rsp
.
skipLogNum
=
0
;
while
(
1
)
{
while
(
1
)
{
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
//TODO
consumerEpoch
=
atomic_load_32
(
&
pConsumer
->
epoch
);
if
(
consumerEpoch
>
pReq
->
epoch
)
{
//TODO: return
break
;
}
SWalReadHead
*
pHead
;
SWalReadHead
*
pHead
;
if
(
walReadWithHandle_s
(
pTopic
->
pReadhandle
,
fetchOffset
,
&
pHead
)
<
0
)
{
if
(
walReadWithHandle_s
(
pTopic
->
pReadhandle
,
fetchOffset
,
&
pHead
)
<
0
)
{
// TODO: no more log, set timer to wait blocking time
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// if data inserted during waiting, launch query and
// response to user
// response to user
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
break
;
break
;
}
}
/*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
SSubmitReq
*
pCont
=
(
SSubmitReq
*
)
&
pHead
->
body
;
SSubmitReq
*
pCont
=
(
SSubmitReq
*
)
&
pHead
->
body
;
/*printf("from topic %s from consumer\n", pTopic->topicName, consumerId);*/
qTaskInfo_t
task
=
pTopic
->
buffer
.
output
[
workerId
].
task
;
qTaskInfo_t
task
=
pTopic
->
buffer
.
output
[
workerId
].
task
;
ASSERT
(
task
);
ASSERT
(
task
);
qSetStreamInput
(
task
,
pCont
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
qSetStreamInput
(
task
,
pCont
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
...
@@ -324,6 +353,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -324,6 +353,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
}
if
(
taosArrayGetSize
(
pRes
)
==
0
)
{
if
(
taosArrayGetSize
(
pRes
)
==
0
)
{
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
fetchOffset
++
;
fetchOffset
++
;
rsp
.
skipLogNum
++
;
rsp
.
skipLogNum
++
;
taosArrayDestroy
(
pRes
);
taosArrayDestroy
(
pRes
);
...
@@ -352,7 +382,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -352,7 +382,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
pCont
=
buf
;
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
pMsg
->
code
=
0
;
/*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
tmsgSendRsp
(
pMsg
);
tmsgSendRsp
(
pMsg
);
taosMemoryFree
(
pHead
);
taosMemoryFree
(
pHead
);
return
0
;
return
0
;
...
@@ -383,7 +413,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -383,7 +413,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
contLen
=
tlen
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
tmsgSendRsp
(
pMsg
);
/*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) not rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerId
,
pReq
->
epoch
);
/*}*/
/*}*/
return
0
;
return
0
;
...
@@ -393,6 +423,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
...
@@ -393,6 +423,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq
req
=
{
0
};
SMqMVRebReq
req
=
{
0
};
tDecodeSMqMVRebReq
(
msg
,
&
req
);
tDecodeSMqMVRebReq
(
msg
,
&
req
);
vDebug
(
"vg %d set from consumer %ld to consumer %ld"
,
req
.
vgId
,
req
.
oldConsumerId
,
req
.
newConsumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
ASSERT
(
pConsumer
);
ASSERT
(
pConsumer
);
pConsumer
->
consumerId
=
req
.
newConsumerId
;
pConsumer
->
consumerId
=
req
.
newConsumerId
;
...
@@ -407,17 +438,19 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
...
@@ -407,17 +438,19 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
SMqSetCVgReq
req
=
{
0
};
SMqSetCVgReq
req
=
{
0
};
tDecodeSMqSetCVgReq
(
msg
,
&
req
);
tDecodeSMqSetCVgReq
(
msg
,
&
req
);
/*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
vDebug
(
"vg %d set to consumer %ld"
,
req
.
vgId
,
req
.
consumerId
);
STqConsumer
*
pConsumer
=
taosMemoryCalloc
(
1
,
sizeof
(
STqConsumer
));
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
consumerId
);
if
(
pConsumer
==
NULL
)
{
pConsumer
=
taosMemoryCalloc
(
1
,
sizeof
(
STqConsumer
));
if
(
pConsumer
==
NULL
)
{
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
strcpy
(
pConsumer
->
cgroup
,
req
.
cgroup
);
strcpy
(
pConsumer
->
cgroup
,
req
.
cgroup
);
pConsumer
->
topics
=
taosArrayInit
(
0
,
sizeof
(
STqTopic
));
pConsumer
->
topics
=
taosArrayInit
(
0
,
sizeof
(
STqTopic
));
pConsumer
->
consumerId
=
req
.
consumerId
;
pConsumer
->
consumerId
=
req
.
consumerId
;
pConsumer
->
epoch
=
0
;
pConsumer
->
epoch
=
0
;
}
STqTopic
*
pTopic
=
taosMemoryCalloc
(
1
,
sizeof
(
STqTopic
));
STqTopic
*
pTopic
=
taosMemoryCalloc
(
1
,
sizeof
(
STqTopic
));
if
(
pTopic
==
NULL
)
{
if
(
pTopic
==
NULL
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
61e71159
...
@@ -625,24 +625,6 @@ typedef struct SSortOperatorInfo {
...
@@ -625,24 +625,6 @@ typedef struct SSortOperatorInfo {
uint64_t
totalElapsed
;
// total elapsed time
uint64_t
totalElapsed
;
// total elapsed time
}
SSortOperatorInfo
;
}
SSortOperatorInfo
;
typedef
struct
SDistinctDataInfo
{
int32_t
index
;
int32_t
type
;
int32_t
bytes
;
}
SDistinctDataInfo
;
typedef
struct
SDistinctOperatorInfo
{
SHashObj
*
pSet
;
SSDataBlock
*
pRes
;
bool
recordNullVal
;
// has already record the null value, no need to try again
// int64_t threshold; // todo remove it
// int64_t outputCapacity;// todo remove it
// int32_t totalBytes; // todo remove it
SResultInfo
resInfo
;
char
*
buf
;
SArray
*
pDistinctDataInfo
;
}
SDistinctOperatorInfo
;
int32_t
operatorDummyOpenFn
(
SOperatorInfo
*
pOperator
);
int32_t
operatorDummyOpenFn
(
SOperatorInfo
*
pOperator
);
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
);
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
...
@@ -659,6 +641,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
...
@@ -659,6 +641,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
uint64_t
*
total
,
SArray
*
pColList
);
uint64_t
*
total
,
SArray
*
pColList
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
);
SSDataBlock
*
loadNextDataBlock
(
void
*
param
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pTsdbReadHandle
,
int32_t
order
,
int32_t
numOfCols
,
int32_t
repeatTime
,
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pTsdbReadHandle
,
int32_t
order
,
int32_t
numOfCols
,
int32_t
repeatTime
,
...
@@ -682,8 +666,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
...
@@ -682,8 +666,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
SSDataBlock
*
pResBlock
,
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
SSDataBlock
*
pResBlock
,
int32_t
fillType
,
char
*
fillVal
,
bool
multigroupResult
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
fillType
,
char
*
fillVal
,
bool
multigroupResult
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createDistinctOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResultBlock
,
SArray
*
pSortInfo
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SExprInfo* pExpr, int32_t numOfOutput);
...
@@ -705,6 +691,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
...
@@ -705,6 +691,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
int32_t numOfOutput);
#endif
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
);
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
61e71159
...
@@ -1629,7 +1629,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
...
@@ -1629,7 +1629,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// window start(end) key interpolation
// window start(end) key interpolation
doWindowBorderInterpolation
(
pOperatorInfo
,
pSDataBlock
,
pInfo
->
binfo
.
pCtx
,
pResult
,
&
nextWin
,
startPos
,
forwardStep
,
pInfo
->
order
,
false
);
doWindowBorderInterpolation
(
pOperatorInfo
,
pSDataBlock
,
pInfo
->
binfo
.
pCtx
,
pResult
,
&
nextWin
,
startPos
,
forwardStep
,
pInfo
->
order
,
false
);
updateTimeWindowInfo
(
&
pInfo
->
timeWindowData
,
&
w
in
,
true
);
updateTimeWindowInfo
(
&
pInfo
->
timeWindowData
,
&
nextW
in
,
true
);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
nextWin
,
&
pInfo
->
timeWindowData
,
startPos
,
forwardStep
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
nextWin
,
&
pInfo
->
timeWindowData
,
startPos
,
forwardStep
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
}
}
...
@@ -3295,6 +3295,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
...
@@ -3295,6 +3295,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
SFilterInfo
*
filter
=
NULL
;
SFilterInfo
*
filter
=
NULL
;
// todo move to the initialization function
int32_t
code
=
filterInitFromNode
((
SNode
*
)
pFilterNode
,
&
filter
,
0
);
int32_t
code
=
filterInitFromNode
((
SNode
*
)
pFilterNode
,
&
filter
,
0
);
SFilterColumnParam
param1
=
{.
numOfCols
=
pBlock
->
info
.
numOfCols
,
.
pDataBlock
=
pBlock
->
pDataBlock
};
SFilterColumnParam
param1
=
{.
numOfCols
=
pBlock
->
info
.
numOfCols
,
.
pDataBlock
=
pBlock
->
pDataBlock
};
...
@@ -4755,8 +4756,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan
...
@@ -4755,8 +4756,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan
pBlock
->
info
.
rows
+=
1
;
pBlock
->
info
.
rows
+=
1
;
}
}
static
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
bool
hasVarCol
,
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
)
{
int32_t
capacity
)
{
blockDataCleanup
(
pDataBlock
);
blockDataCleanup
(
pDataBlock
);
while
(
1
)
{
while
(
1
)
{
...
@@ -4777,7 +4777,6 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB
...
@@ -4777,7 +4777,6 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB
SSDataBlock
*
loadNextDataBlock
(
void
*
param
)
{
SSDataBlock
*
loadNextDataBlock
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
bool
newgroup
=
false
;
bool
newgroup
=
false
;
return
pOperator
->
getNextFn
(
pOperator
,
&
newgroup
);
return
pOperator
->
getNextFn
(
pOperator
,
&
newgroup
);
}
}
...
@@ -4957,7 +4956,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
...
@@ -4957,7 +4956,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pInfo
->
hasVarCol
,
pInfo
->
binfo
.
capacity
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pInfo
->
binfo
.
capacity
);
}
}
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
...
@@ -5102,15 +5101,14 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
...
@@ -5102,15 +5101,14 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSortOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSortOperatorInfo
*
pInfo
=
pOperator
->
info
;
bool
hasVarCol
=
pInfo
->
pDataBlock
->
info
.
hasVarCol
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
hasVarCol
,
pInfo
->
numOfRowsInRes
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
pInfo
->
numOfRowsInRes
);
}
}
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
SORT_SINGLESOURCE_SORT
,
pInfo
->
bufPageSize
,
numOfBufPage
,
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
SORT_SINGLESOURCE_SORT
,
pInfo
->
bufPageSize
,
numOfBufPage
,
pInfo
->
pDataBlock
,
"GET_TASKID(pTaskInfo)"
);
pInfo
->
pDataBlock
,
pTaskInfo
->
id
.
str
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
loadNextDataBlock
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
loadNextDataBlock
);
...
@@ -5124,27 +5122,23 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
...
@@ -5124,27 +5122,23 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
}
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
hasVarCol
,
pInfo
->
numOfRowsInRes
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
pInfo
->
numOfRowsInRes
);
}
}
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
SExecTaskInfo
*
pTaskInfo
)
{
SSortOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortOperatorInfo
));
SSortOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
taosMemoryFreeClear
(
pInfo
);
goto
_error
;
taosMemoryFreeClear
(
pOperator
);
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
NULL
;
}
}
pInfo
->
sortBufSize
=
1024
*
16
;
// 1MB,
TODO dynamic set the available sort buffer
pInfo
->
sortBufSize
=
1024
*
16
;
//
TODO dynamic set the available sort buffer
pInfo
->
bufPageSize
=
1024
;
pInfo
->
bufPageSize
=
1024
;
pInfo
->
numOfRowsInRes
=
1024
;
pInfo
->
numOfRowsInRes
=
1024
;
pInfo
->
pDataBlock
=
pResBlock
;
pInfo
->
pDataBlock
=
pResBlock
;
pInfo
->
pSortInfo
=
pSortInfo
;
pInfo
->
pSortInfo
=
pSortInfo
;
pOperator
->
name
=
"Sort
"
;
pOperator
->
name
=
"SortOperator
"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SORT
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SORT
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
...
@@ -5156,6 +5150,12 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
...
@@ -5156,6 +5150,12 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
return
pOperator
;
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pOperator
);
return
NULL
;
}
}
static
int32_t
getTableScanOrder
(
STableScanInfo
*
pTableScanInfo
)
{
return
pTableScanInfo
->
order
;
}
static
int32_t
getTableScanOrder
(
STableScanInfo
*
pTableScanInfo
)
{
return
pTableScanInfo
->
order
;
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
61e71159
...
@@ -131,7 +131,7 @@ static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock,
...
@@ -131,7 +131,7 @@ static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock,
}
}
}
}
static
int32_t
buildGroup
ValKey
(
void
*
pKey
,
int32_t
*
length
,
SArray
*
pGroupColVals
)
{
static
int32_t
buildGroup
Keys
(
void
*
pKey
,
const
SArray
*
pGroupColVals
)
{
ASSERT
(
pKey
!=
NULL
);
ASSERT
(
pKey
!=
NULL
);
size_t
numOfGroupCols
=
taosArrayGetSize
(
pGroupColVals
);
size_t
numOfGroupCols
=
taosArrayGetSize
(
pGroupColVals
);
...
@@ -155,8 +155,7 @@ static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVa
...
@@ -155,8 +155,7 @@ static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVa
}
}
}
}
*
length
=
(
pStart
-
(
char
*
)
pKey
);
return
(
int32_t
)
(
pStart
-
(
char
*
)
pKey
);
return
0
;
}
}
// assign the group keys or user input constant values if required
// assign the group keys or user input constant values if required
...
@@ -217,7 +216,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
...
@@ -217,7 +216,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
continue
;
continue
;
}
}
/*int32_t ret = */
buildGroupValKey
(
pInfo
->
keyBuf
,
&
len
,
pInfo
->
pGroupColVals
);
len
=
buildGroupKeys
(
pInfo
->
keyBuf
,
pInfo
->
pGroupColVals
);
int32_t
ret
=
setGroupResultOutputBuf_rv
(
&
(
pInfo
->
binfo
),
pOperator
->
numOfOutput
,
pInfo
->
keyBuf
,
TSDB_DATA_TYPE_VARCHAR
,
len
,
0
,
pInfo
->
aggSup
.
pResultBuf
,
pTaskInfo
,
&
pInfo
->
aggSup
);
int32_t
ret
=
setGroupResultOutputBuf_rv
(
&
(
pInfo
->
binfo
),
pOperator
->
numOfOutput
,
pInfo
->
keyBuf
,
TSDB_DATA_TYPE_VARCHAR
,
len
,
0
,
pInfo
->
aggSup
.
pResultBuf
,
pTaskInfo
,
&
pInfo
->
aggSup
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
...
@@ -233,7 +232,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
...
@@ -233,7 +232,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
if
(
num
>
0
)
{
if
(
num
>
0
)
{
/*int32_t ret = */
buildGroupValKey
(
pInfo
->
keyBuf
,
&
len
,
pInfo
->
pGroupColVals
);
len
=
buildGroupKeys
(
pInfo
->
keyBuf
,
pInfo
->
pGroupColVals
);
int32_t
ret
=
int32_t
ret
=
setGroupResultOutputBuf_rv
(
&
(
pInfo
->
binfo
),
pOperator
->
numOfOutput
,
pInfo
->
keyBuf
,
TSDB_DATA_TYPE_VARCHAR
,
len
,
setGroupResultOutputBuf_rv
(
&
(
pInfo
->
binfo
),
pOperator
->
numOfOutput
,
pInfo
->
keyBuf
,
TSDB_DATA_TYPE_VARCHAR
,
len
,
0
,
pInfo
->
aggSup
.
pResultBuf
,
pTaskInfo
,
&
pInfo
->
aggSup
);
0
,
pInfo
->
aggSup
.
pResultBuf
,
pTaskInfo
,
&
pInfo
->
aggSup
);
...
@@ -346,162 +345,66 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
...
@@ -346,162 +345,66 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return
pOperator
;
return
pOperator
;
_error:
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
return
NULL
;
}
}
#define MULTI_KEY_DELIM "-"
static
SSDataBlock
*
doPartitionData
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
void
destroyDistinctOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SDistinctOperatorInfo
*
pInfo
=
(
SDistinctOperatorInfo
*
)
param
;
taosHashCleanup
(
pInfo
->
pSet
);
taosMemoryFreeClear
(
pInfo
->
buf
);
taosArrayDestroy
(
pInfo
->
pDistinctDataInfo
);
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
}
static
void
buildMultiDistinctKey
(
SDistinctOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
int32_t
rowId
)
{
char
*
p
=
pInfo
->
buf
;
// memset(p, 0, pInfo->totalBytes);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pDistinctDataInfo
);
i
++
)
{
SDistinctDataInfo
*
pDistDataInfo
=
(
SDistinctDataInfo
*
)
taosArrayGet
(
pInfo
->
pDistinctDataInfo
,
i
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pDistDataInfo
->
index
);
char
*
val
=
((
char
*
)
pColDataInfo
->
pData
)
+
pColDataInfo
->
info
.
bytes
*
rowId
;
if
(
isNull
(
val
,
pDistDataInfo
->
type
))
{
p
+=
pDistDataInfo
->
bytes
;
continue
;
}
if
(
IS_VAR_DATA_TYPE
(
pDistDataInfo
->
type
))
{
memcpy
(
p
,
varDataVal
(
val
),
varDataLen
(
val
));
p
+=
varDataLen
(
val
);
}
else
{
memcpy
(
p
,
val
,
pDistDataInfo
->
bytes
);
p
+=
pDistDataInfo
->
bytes
;
}
memcpy
(
p
,
MULTI_KEY_DELIM
,
strlen
(
MULTI_KEY_DELIM
));
p
+=
strlen
(
MULTI_KEY_DELIM
);
}
}
static
bool
initMultiDistinctInfo
(
SDistinctOperatorInfo
*
pInfo
,
SOperatorInfo
*
pOperator
)
{
for
(
int
i
=
0
;
i
<
pOperator
->
numOfOutput
;
i
++
)
{
// pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
#if 0
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock));
assert(i < numOfCols);
for (int j = 0; j < numOfCols; j++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) {
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
}
}
}
#endif
// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes);
return
taosArrayGetSize
(
pInfo
->
pDistinctDataInfo
)
==
pOperator
->
numOfOutput
?
true
:
false
;
}
static
SSDataBlock
*
hashDistinct
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
return
NULL
;
}
}
SDistinctOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
SSortOperatorInfo
*
pInfo
=
pOperator
->
info
;
bool
hasVarCol
=
pInfo
->
pDataBlock
->
info
.
hasVarCol
;
pRes
->
info
.
rows
=
0
;
SSDataBlock
*
pBlock
=
NULL
;
SOperatorInfo
*
pDownstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
pDownstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
pBlock
=
pDownstream
->
getNextFn
(
pDownstream
,
newgroup
);
publishOperatorProfEvent
(
pDownstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
// ensure result output buf
if
(
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
>
pInfo
->
resInfo
.
capacity
)
{
int32_t
newSize
=
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pRes
->
pDataBlock
);
i
++
)
{
SColumnInfoData
*
pResultColInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
i
);
SDistinctDataInfo
*
pDistDataInfo
=
taosArrayGet
(
pInfo
->
pDistinctDataInfo
,
i
);
// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
// if (tmp == NULL) {
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
pInfo
->
numOfRowsInRes
);
// return NULL;
// } else {
// pResultColInfoData->pData = tmp;
// }
}
pInfo
->
resInfo
.
capacity
=
newSize
;
}
}
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
buildMultiDistinctKey
(
pInfo
,
pBlock
,
i
);
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
SORT_SINGLESOURCE_SORT
,
pInfo
->
bufPageSize
,
numOfBufPage
,
if
(
taosHashGet
(
pInfo
->
pSet
,
pInfo
->
buf
,
0
)
==
NULL
)
{
pInfo
->
pDataBlock
,
pTaskInfo
->
id
.
str
);
taosHashPut
(
pInfo
->
pSet
,
pInfo
->
buf
,
0
,
NULL
,
0
);
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
pRes
->
pDataBlock
);
j
++
)
{
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
loadNextDataBlock
);
SDistinctDataInfo
*
pDistDataInfo
=
taosArrayGet
(
pInfo
->
pDistinctDataInfo
,
j
);
// distinct meta info
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pDistDataInfo
->
index
);
// src
SColumnInfoData
*
pResultColInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
j
);
// dist
char
*
val
=
((
char
*
)
pColInfoData
->
pData
)
+
pDistDataInfo
->
bytes
*
i
;
char
*
start
=
pResultColInfoData
->
pData
+
pDistDataInfo
->
bytes
*
pInfo
->
pRes
->
info
.
rows
;
memcpy
(
start
,
val
,
pDistDataInfo
->
bytes
);
}
pRes
->
info
.
rows
+=
1
;
SGenericSource
*
ps
=
taosMemoryCalloc
(
1
,
sizeof
(
SGenericSource
))
;
}
ps
->
param
=
pOperator
->
pDownstream
[
0
];
}
tsortAddSource
(
pInfo
->
pSortHandle
,
ps
);
if
(
pRes
->
info
.
rows
>=
pInfo
->
resInfo
.
threshold
)
{
int32_t
code
=
tsortOpen
(
pInfo
->
pSortHandle
);
break
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
}
longjmp
(
pTaskInfo
->
env
,
terrno
);
}
}
return
(
pInfo
->
pRes
->
info
.
rows
>
0
)
?
pInfo
->
pRes
:
NULL
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
pDataBlock
,
pInfo
->
numOfRowsInRes
);
}
}
SOperatorInfo
*
create
DistinctOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTask
Info
)
{
SOperatorInfo
*
create
PartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResultBlock
,
SArray
*
pSortInfo
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroup
Info
)
{
S
DistinctOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SDistinc
tOperatorInfo
));
S
SortOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSor
tOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
goto
_error
;
}
}
pOperator
->
resultInfo
.
capacity
=
4096
;
// todo extract function.
pInfo
->
sortBufSize
=
1024
*
16
;
// TODO dynamic set the available sort buffer
pInfo
->
bufPageSize
=
1024
;
pInfo
->
numOfRowsInRes
=
1024
;
pInfo
->
pDataBlock
=
pResultBlock
;
pInfo
->
pSortInfo
=
pSortInfo
;
// pInfo->totalBytes = 0;
pOperator
->
name
=
"PartitionOperator"
;
pInfo
->
buf
=
NULL
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PARTITION
;
pInfo
->
pDistinctDataInfo
=
taosArrayInit
(
numOfCols
,
sizeof
(
SDistinctDataInfo
));
initMultiDistinctInfo
(
pInfo
,
pOperator
);
pInfo
->
pSet
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
pOperator
->
name
=
"DistinctOperator"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
// pOperator->operatorType = DISTINCT;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
getNextFn
=
hashDistinct
;
pOperator
->
closeFn
=
destroyDistinctOperatorInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
getNextFn
=
doPartitionData
;
// pOperator->closeFn = destroyOrderOperatorInfo;
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
return
pOperator
;
...
...
source/os/src/osShm.c
浏览文件 @
61e71159
...
@@ -17,14 +17,29 @@
...
@@ -17,14 +17,29 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "os.h"
#define MAX_SHMIDS 6
static
int32_t
shmids
[
MAX_SHMIDS
]
=
{
0
};
static
void
taosDeleteCreatedShms
()
{
for
(
int32_t
i
=
0
;
i
<
MAX_SHMIDS
;
++
i
)
{
int32_t
shmid
=
shmids
[
i
]
-
1
;
if
(
shmid
>=
0
)
{
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
}
}
}
int32_t
taosCreateShm
(
SShm
*
pShm
,
int32_t
key
,
int32_t
shmsize
)
{
int32_t
taosCreateShm
(
SShm
*
pShm
,
int32_t
key
,
int32_t
shmsize
)
{
pShm
->
id
=
-
1
;
pShm
->
id
=
-
1
;
// key_t shkey = IPC_PRIVATE;
#if 1
// int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600;
key_t
__shkey
=
IPC_PRIVATE
;
int32_t
__shmflag
=
IPC_CREAT
|
IPC_EXCL
|
0600
;
#else
key_t
__shkey
=
0X95270000
+
key
;
key_t
__shkey
=
0X95270000
+
key
;
int32_t
__shmflag
=
IPC_CREAT
|
0600
;
int32_t
__shmflag
=
IPC_CREAT
|
0600
;
#endif
int32_t
shmid
=
shmget
(
__shkey
,
shmsize
,
__shmflag
);
int32_t
shmid
=
shmget
(
__shkey
,
shmsize
,
__shmflag
);
if
(
shmid
<
0
)
{
if
(
shmid
<
0
)
{
...
@@ -39,6 +54,16 @@ int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) {
...
@@ -39,6 +54,16 @@ int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) {
pShm
->
id
=
shmid
;
pShm
->
id
=
shmid
;
pShm
->
size
=
shmsize
;
pShm
->
size
=
shmsize
;
pShm
->
ptr
=
shmptr
;
pShm
->
ptr
=
shmptr
;
#if 0
if (key >= 0 && key < MAX_SHMIDS) {
shmids[key] = pShm->id + 1;
}
atexit(taosDeleteCreatedShms);
#else
shmctl
(
pShm
->
id
,
IPC_RMID
,
NULL
);
#endif
return
0
;
return
0
;
}
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
61e71159
...
@@ -55,8 +55,8 @@
...
@@ -55,8 +55,8 @@
# --- for multi process mode
# --- for multi process mode
#
./test.sh -f tsim/user/basic1.sim -m
./test.sh -f tsim/user/basic1.sim -m
#
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
#
./test.sh -f tsim/tmq/basic.sim -m
./test.sh -f tsim/tmq/basic.sim -m
#======================b1-end===============
#======================b1-end===============
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录