Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4a2b2666
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4a2b2666
编写于
7月 02, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(stream): support ignore expired flag
上级
4a988a7c
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
71 addition
and
58 deletion
+71
-58
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-1
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+2
-2
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+2
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+9
-8
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+3
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+20
-17
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-0
tests/script/tsim/tmq/snapshot1.sim
tests/script/tsim/tmq/snapshot1.sim
+1
-1
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+29
-27
未找到文件。
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
4a2b2666
...
...
@@ -556,7 +556,7 @@ typedef struct {
int64_t
uid
;
int8_t
status
;
// config
int8_t
dropPolicy
;
int8_t
igExpired
;
int8_t
trigger
;
int64_t
triggerParam
;
int64_t
watermark
;
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
4a2b2666
...
...
@@ -28,7 +28,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeI64
(
pEncoder
,
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
dropPolicy
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
igExpired
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
watermark
)
<
0
)
return
-
1
;
...
...
@@ -73,7 +73,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
dropPolicy
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
igExpired
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
watermark
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
4a2b2666
...
...
@@ -248,7 +248,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj
->
status
=
0
;
// TODO
pObj
->
dropPolicy
=
0
;
pObj
->
igExpired
=
pCreate
->
igExpired
;
pObj
->
trigger
=
pCreate
->
triggerType
;
pObj
->
triggerParam
=
pCreate
->
maxDelay
;
pObj
->
watermark
=
pCreate
->
watermark
;
...
...
@@ -301,6 +301,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.
streamQuery
=
true
,
.
triggerType
=
pObj
->
trigger
==
STREAM_TRIGGER_MAX_DELAY
?
STREAM_TRIGGER_WINDOW_CLOSE
:
pObj
->
trigger
,
.
watermark
=
pObj
->
watermark
,
.
igExpired
=
pObj
->
igExpired
,
};
// using ast and param to build physical plan
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4a2b2666
...
...
@@ -183,13 +183,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
}
else
{
ASSERT
(
0
);
}
STqOffset
*
pOffset
=
tqOffsetRead
(
pTq
->
pOffsetStore
,
offset
.
subKey
);
if
(
pOffset
==
NULL
||
pOffset
->
val
.
version
<
offset
.
val
.
version
)
{
if
(
tqOffsetWrite
(
pTq
->
pOffsetStore
,
&
offset
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
/*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/
/*if (pOffset != NULL) {*/
/*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/
if
(
tqOffsetWrite
(
pTq
->
pOffsetStore
,
&
offset
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
/*}*/
/*}*/
return
0
;
}
...
...
@@ -375,8 +377,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosMemoryFree
(
pCkHead
);
}
else
if
(
fetchOffsetNew
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
tqInfo
(
"retrieve using snapshot req offset: uid %ld ts %ld, actual offset: uid %ld ts %ld"
,
dataRsp
.
reqOffset
.
uid
,
dataRsp
.
reqOffset
.
ts
,
fetchOffsetNew
.
uid
,
fetchOffsetNew
.
ts
);
tqInfo
(
"retrieve using snapshot actual offset: uid %ld ts %ld"
,
fetchOffsetNew
.
uid
,
fetchOffsetNew
.
ts
);
if
(
tqScanSnapshot
(
pTq
,
&
pHandle
->
execHandle
,
&
dataRsp
,
fetchOffsetNew
,
workerId
)
<
0
)
{
ASSERT
(
0
);
}
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
4a2b2666
...
...
@@ -120,7 +120,9 @@ bool tqNextDataBlock(SStreamReader* pHandle) {
return
true
;
}
void
*
ret
=
taosHashGet
(
pHandle
->
tbIdHash
,
&
pHandle
->
msgIter
.
uid
,
sizeof
(
int64_t
));
/*tqDebug("search uid %ld", pHandle->msgIter.uid);*/
if
(
ret
!=
NULL
)
{
/*tqDebug("find uid %ld", pHandle->msgIter.uid);*/
return
true
;
}
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
4a2b2666
...
...
@@ -356,6 +356,7 @@ typedef struct SStreamBlockScanInfo {
SUpdateInfo
*
pUpdateInfo
;
EStreamScanMode
scanMode
;
SOperatorInfo
*
pStreamScanOp
;
SOperatorInfo
*
pSnapshotReadOp
;
SArray
*
childIds
;
SessionWindowSupporter
sessionSup
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
4a2b2666
...
...
@@ -145,10 +145,12 @@ static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo,
continue
;
}
// TODO handle ntb case
if
(
mr
.
me
.
type
!=
TSDB_CHILD_TABLE
||
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
// TODO handle ntb case
/*pScanInfo->pStreamScanOp->pTaskInfo->tableqinfoList.*/
// handle multiple partition
taosArrayPush
(
qa
,
id
);
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
4a2b2666
...
...
@@ -2010,8 +2010,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
ASSERT
(
pDataInfo
->
status
==
EX_SOURCE_DATA_NOT_READY
);
qDebug
(
"%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", execId:%d, %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epSet
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
pSource
->
execId
,
sourceIndex
,
totalSources
);
qDebug
(
"%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", execId:%d, %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epSet
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
pSource
->
execId
,
sourceIndex
,
totalSources
);
pMsg
->
header
.
vgId
=
htonl
(
pSource
->
addr
.
nodeId
);
pMsg
->
sId
=
htobe64
(
pSource
->
schedId
);
...
...
@@ -2146,8 +2147,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SSDataBlock
*
pRes
=
pExchangeInfo
->
pResult
;
SLoadRemoteDataInfo
*
pLoadInfo
=
&
pExchangeInfo
->
loadInfo
;
if
(
pRsp
->
numOfRows
==
0
)
{
qDebug
(
"%s vgId:%d, taskId:0x%"
PRIx64
" execId:%d index:%d completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
", completed:%d try next %d/%"
PRIzu
,
qDebug
(
"%s vgId:%d, taskId:0x%"
PRIx64
" execId:%d index:%d completed, rowsOfSource:%"
PRIu64
",
totalRows:%"
PRIu64
",
completed:%d try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
i
,
pDataInfo
->
totalRows
,
pExchangeInfo
->
loadInfo
.
totalRows
,
completed
+
1
,
i
+
1
,
totalSources
);
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
...
...
@@ -2166,18 +2167,19 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
if
(
pRsp
->
completed
==
1
)
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" execId:%d"
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" execId:%d"
" index:%d completed, numOfRows:%d, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
", completed:%d try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
i
,
pRes
->
info
.
rows
,
pDataInfo
->
totalRows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
completed
+
1
,
i
+
1
,
totalSources
);
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
i
,
pRes
->
info
.
rows
,
p
DataInfo
->
totalRows
,
p
LoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
completed
+
1
,
i
+
1
,
totalSources
);
completed
+=
1
;
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
}
else
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" execId:%d numOfRows:%d, totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
pRes
->
info
.
rows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
);
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
pRes
->
info
.
rows
,
pLoadInfo
->
total
Rows
,
pLoadInfo
->
total
Size
);
}
taosMemoryFreeClear
(
pDataInfo
->
pRsp
);
...
...
@@ -2250,8 +2252,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SDownstreamSourceNode
*
pSource
=
taosArrayGet
(
pExchangeInfo
->
pSources
,
pExchangeInfo
->
current
);
if
(
pDataInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s vgId:%d, taskID:0x%"
PRIx64
" execId:%d error happens, code:%s"
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
tstrerror
(
pDataInfo
->
code
));
qError
(
"%s vgId:%d, taskID:0x%"
PRIx64
" execId:%d error happens, code:%s"
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
tstrerror
(
pDataInfo
->
code
));
pOperator
->
pTaskInfo
->
code
=
pDataInfo
->
code
;
return
NULL
;
}
...
...
@@ -2259,8 +2261,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp
*
pRsp
=
pDataInfo
->
pRsp
;
SLoadRemoteDataInfo
*
pLoadInfo
=
&
pExchangeInfo
->
loadInfo
;
if
(
pRsp
->
numOfRows
==
0
)
{
qDebug
(
"%s vgId:%d, taskID:0x%"
PRIx64
" execId:%d %d of total completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
" try next"
,
qDebug
(
"%s vgId:%d, taskID:0x%"
PRIx64
" execId:%d %d of total completed, rowsOfSource:%"
PRIu64
"
, totalRows:%"
PRIu64
"
try next"
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
pExchangeInfo
->
current
+
1
,
pDataInfo
->
totalRows
,
pLoadInfo
->
totalRows
);
...
...
@@ -2279,16 +2281,17 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
if
(
pRsp
->
completed
==
1
)
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" execId:%d numOfRows:%d, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
" try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
pRes
->
info
.
rows
,
pDataInfo
->
totalRows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
pExchangeInfo
->
current
+
1
,
totalSources
);
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
pRes
->
info
.
rows
,
pDataInfo
->
totalRows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
pExchangeInfo
->
current
+
1
,
totalSources
);
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
pExchangeInfo
->
current
+=
1
;
}
else
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" execId:%d numOfRows:%d, totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
pRes
->
info
.
rows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
);
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
pRes
->
info
.
rows
,
pLoadInfo
->
total
Rows
,
pLoadInfo
->
total
Size
);
}
pOperator
->
resultInfo
.
totalRows
+=
pRes
->
info
.
rows
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
4a2b2666
...
...
@@ -1274,6 +1274,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
sessionSup
=
(
SessionWindowSupporter
){.
pStreamAggSup
=
NULL
,
.
gap
=
-
1
};
pInfo
->
groupId
=
0
;
pInfo
->
pPullDataRes
=
createPullDataBlock
();
pInfo
->
pStreamScanOp
=
pOperator
;
pOperator
->
name
=
"StreamBlockScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
...
...
tests/script/tsim/tmq/snapshot1.sim
浏览文件 @
4a2b2666
...
...
@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay =
5
$pullDelay =
2
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1
...
...
tests/test/c/tmqSim.c
浏览文件 @
4a2b2666
...
...
@@ -36,6 +36,7 @@
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32)
int64_t
now
;
typedef
enum
{
NOTIFY_CMD_START_CONSUM
,
NOTIFY_CMD_START_COMMIT
,
...
...
@@ -525,15 +526,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
static
int32_t
g_once_commit_flag
=
0
;
static
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
pError
(
"tmq_commit_cb_print() commit %d
\n
"
,
code
);
pError
(
"tmq_commit_cb_print() commit %d
\n
"
,
code
);
if
(
0
==
g_once_commit_flag
)
{
g_once_commit_flag
=
1
;
notifyMainScript
((
SThreadInfo
*
)
param
,
(
int32_t
)
NOTIFY_CMD_START_COMMIT
);
if
(
0
==
g_once_commit_flag
)
{
g_once_commit_flag
=
1
;
notifyMainScript
((
SThreadInfo
*
)
param
,
(
int32_t
)
NOTIFY_CMD_START_COMMIT
);
}
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s tmq_commit_cb_print() be called
\n
"
,
getCurrentTimeString
(
tmpString
));
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s tmq_commit_cb_print() be called
\n
"
,
getCurrentTimeString
(
tmpString
));
}
void
build_consumer
(
SThreadInfo
*
pInfo
)
{
...
...
@@ -588,12 +589,10 @@ void build_topic_list(SThreadInfo* pInfo) {
int32_t
saveConsumeResult
(
SThreadInfo
*
pInfo
)
{
char
sqlStr
[
1024
]
=
{
0
};
int64_t
now
=
taosGetTimestampMs
();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf
(
sqlStr
,
"insert into %s.consumeresult values (%"
PRId64
", %d, %"
PRId64
", %"
PRId64
", %d)"
,
g_stConfInfo
.
cdbName
,
now
,
pInfo
->
consumerId
,
pInfo
->
consumeMsgCnt
,
pInfo
->
consumeRowCnt
,
pInfo
->
checkresult
);
g_stConfInfo
.
cdbName
,
atomic_fetch_add_64
(
&
now
,
1
),
pInfo
->
consumerId
,
pInfo
->
consumeMsgCnt
,
pInfo
->
consumeRowCnt
,
pInfo
->
checkresult
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s, consume id %d result: %s
\n
"
,
getCurrentTimeString
(
tmpString
),
pInfo
->
consumerId
,
sqlStr
);
...
...
@@ -637,9 +636,9 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
int64_t
lastTotalMsgs
=
0
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
int64_t
lastTotalMsgs
=
0
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
int32_t
consumeDelay
=
g_stConfInfo
.
consumeDelay
==
-
1
?
-
1
:
(
g_stConfInfo
.
consumeDelay
*
1000
);
while
(
running
)
{
...
...
@@ -652,16 +651,16 @@ void loop_consume(SThreadInfo* pInfo) {
taos_free_result
(
tmqMsg
);
totalMsgs
++
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
10
*
1000
)
{
taosFprintfFile
(
g_fp
,
"consumer id %d has currently poll total msgs: %"
PRId64
", period rate: %.3f msgs/second
\n
"
,
pInfo
->
consumerId
,
totalMsgs
,
(
totalMsgs
-
lastTotalMsgs
)
*
1000
.
0
/
(
currentPrintTime
-
lastPrintTime
));
lastPrintTime
=
currentPrintTime
;
lastTotalMsgs
=
totalMsgs
;
}
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
10
*
1000
)
{
taosFprintfFile
(
g_fp
,
"consumer id %d has currently poll total msgs: %"
PRId64
", period rate: %.3f msgs/second
\n
"
,
pInfo
->
consumerId
,
totalMsgs
,
(
totalMsgs
-
lastTotalMsgs
)
*
1000
.
0
/
(
currentPrintTime
-
lastPrintTime
));
lastPrintTime
=
currentPrintTime
;
lastTotalMsgs
=
totalMsgs
;
}
if
(
0
==
once_flag
)
{
once_flag
=
1
;
notifyMainScript
(
pInfo
,
NOTIFY_CMD_START_CONSUM
);
...
...
@@ -678,7 +677,7 @@ void loop_consume(SThreadInfo* pInfo) {
break
;
}
}
if
(
0
==
running
)
{
taosFprintfFile
(
g_fp
,
"receive stop signal and not continue consume
\n
"
);
}
...
...
@@ -696,6 +695,7 @@ void* consumeThreadFunc(void* param) {
pInfo
->
taos
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pInfo
->
taos
==
NULL
)
{
taosFprintfFile
(
g_fp
,
"taos_connect() fail, can not notify and save consume result to main scripte
\n
"
);
ASSERT
(
0
);
return
NULL
;
}
...
...
@@ -855,6 +855,8 @@ int32_t getConsumeInfo() {
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
now
=
taosGetTimestampMs
();
parseArgument
(
argc
,
argv
);
getConsumeInfo
();
saveConfigToLogFile
();
...
...
@@ -888,11 +890,11 @@ int main(int32_t argc, char* argv[]) {
int64_t
t
=
end
-
start
;
if
(
0
==
t
)
t
=
1
;
double
tInMs
=
(
double
)
t
/
1000000
.
0
;
taosFprintfFile
(
g_fp
,
"Spent %.3f seconds to poll msgs: %"
PRIu64
" with %d thread(s), throughput: %.3f msgs/second
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
));
"Spent %.3f seconds to poll msgs: %"
PRIu64
" with %d thread(s), throughput: %.3f msgs/second
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
));
taosFprintfFile
(
g_fp
,
"==== close tmqlog ====
\n
"
);
taosCloseFile
(
&
g_fp
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录