Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
132b22d4
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
132b22d4
编写于
6月 23, 2022
作者:
L
Liu Jicong
提交者:
GitHub
6月 23, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14150 from taosdata/feature/stream
feat(tmq): scan tsdb as a snapshot
上级
94fd3091
2af4a4d2
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
503 addition
and
89 deletion
+503
-89
examples/c/tmq.c
examples/c/tmq.c
+27
-2
include/common/tcommon.h
include/common/tcommon.h
+2
-2
include/common/tmsg.h
include/common/tmsg.h
+6
-5
include/libs/executor/executor.h
include/libs/executor/executor.h
+9
-0
source/client/src/tmq.c
source/client/src/tmq.c
+17
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+1
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+33
-7
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+24
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+11
-8
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+10
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+0
-1
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
tests/script/tsim/tmq/consume.sh
tests/script/tsim/tmq/consume.sh
+7
-3
tests/script/tsim/tmq/snapshot.sim
tests/script/tsim/tmq/snapshot.sim
+289
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+61
-56
未找到文件。
examples/c/tmq.c
浏览文件 @
132b22d4
...
...
@@ -76,19 +76,41 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(now, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1 tags(2000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(now, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1 tags(3000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table
tu
3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create child table
ct
3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(now, 5, 6, 'c')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
return
0
;
}
...
...
@@ -168,6 +190,9 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"experiment.use.snapshot"
,
"true"
);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
...
...
include/common/tcommon.h
浏览文件 @
132b22d4
...
...
@@ -115,8 +115,8 @@ typedef struct SQueryTableDataCond {
int32_t
type
;
// data block load type:
int32_t
numOfTWindows
;
STimeWindow
*
twindows
;
int
32
_t
startVersion
;
int
32
_t
endVersion
;
int
64
_t
startVersion
;
int
64
_t
endVersion
;
}
SQueryTableDataCond
;
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
...
...
include/common/tmsg.h
浏览文件 @
132b22d4
...
...
@@ -696,12 +696,12 @@ typedef struct {
typedef
STableCfg
STableCfgRsp
;
int32_t
tSerializeSTableCfgReq
(
void
*
buf
,
int32_t
bufLen
,
STableCfgReq
*
pReq
);
int32_t
tDeserializeSTableCfgReq
(
void
*
buf
,
int32_t
bufLen
,
STableCfgReq
*
pReq
);
int32_t
tSerializeSTableCfgReq
(
void
*
buf
,
int32_t
bufLen
,
STableCfgReq
*
pReq
);
int32_t
tDeserializeSTableCfgReq
(
void
*
buf
,
int32_t
bufLen
,
STableCfgReq
*
pReq
);
int32_t
tSerializeSTableCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
STableCfgRsp
*
pRsp
);
int32_t
tDeserializeSTableCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
STableCfgRsp
*
pRsp
);
void
tFreeSTableCfgRsp
(
STableCfgRsp
*
pRsp
);
int32_t
tSerializeSTableCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
STableCfgRsp
*
pRsp
);
int32_t
tDeserializeSTableCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
STableCfgRsp
*
pRsp
);
void
tFreeSTableCfgRsp
(
STableCfgRsp
*
pRsp
);
typedef
struct
{
char
db
[
TSDB_DB_FNAME_LEN
];
...
...
@@ -2669,6 +2669,7 @@ typedef struct {
SMsgHead
head
;
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
int8_t
withTbName
;
int8_t
useSnapshot
;
int32_t
epoch
;
uint64_t
reqId
;
int64_t
consumerId
;
...
...
include/libs/executor/executor.h
浏览文件 @
132b22d4
...
...
@@ -36,11 +36,13 @@ typedef struct SReadHandle {
void
*
vnode
;
void
*
mnd
;
SMsgCb
*
pMsgCb
;
int8_t
initTsdbReader
;
}
SReadHandle
;
enum
{
STREAM_DATA_TYPE_SUBMIT_BLOCK
=
1
,
STREAM_DATA_TYPE_SSDATA_BLOCK
=
2
,
STREAM_DATA_TYPE_FROM_SNAPSHOT
=
3
,
};
typedef
enum
{
...
...
@@ -56,6 +58,13 @@ typedef enum {
*/
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
void
*
streamReadHandle
);
/**
* Switch the stream scan to snapshot mode
* @param tinfo
* @return
*/
int32_t
qStreamScanSnapshot
(
qTaskInfo_t
tinfo
);
/**
* Set the input data block for the stream scan.
* @param tinfo
...
...
source/client/src/tmq.c
浏览文件 @
132b22d4
...
...
@@ -54,6 +54,7 @@ struct tmq_conf_t {
int8_t
autoCommit
;
int8_t
resetOffset
;
int8_t
withTbName
;
int8_t
useSnapshot
;
uint16_t
port
;
int32_t
autoCommitInterval
;
char
*
ip
;
...
...
@@ -69,6 +70,7 @@ struct tmq_t {
char
groupId
[
TSDB_CGROUP_LEN
];
char
clientId
[
256
];
int8_t
withTbName
;
int8_t
useSnapshot
;
int8_t
autoCommit
;
int32_t
autoCommitInterval
;
int32_t
resetOffsetCfg
;
...
...
@@ -282,6 +284,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if
(
strcmp
(
key
,
"experiment.use.snapshot"
)
==
0
)
{
if
(
strcmp
(
value
,
"true"
)
==
0
)
{
conf
->
useSnapshot
=
true
;
return
TMQ_CONF_OK
;
}
else
if
(
strcmp
(
value
,
"false"
)
==
0
)
{
conf
->
useSnapshot
=
false
;
return
TMQ_CONF_OK
;
}
else
{
return
TMQ_CONF_INVALID
;
}
}
if
(
strcmp
(
key
,
"td.connect.ip"
)
==
0
)
{
conf
->
ip
=
strdup
(
value
);
return
TMQ_CONF_OK
;
...
...
@@ -953,6 +967,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
withTbName
=
conf
->
withTbName
;
pTmq
->
useSnapshot
=
conf
->
useSnapshot
;
pTmq
->
autoCommit
=
conf
->
autoCommit
;
pTmq
->
autoCommitInterval
=
conf
->
autoCommitInterval
;
pTmq
->
commitCb
=
conf
->
commitCb
;
...
...
@@ -1534,6 +1549,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
pReq
->
currentOffset
=
reqOffset
;
pReq
->
reqId
=
generateRequestId
();
pReq
->
useSnapshot
=
tmq
->
useSnapshot
;
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqPollReq
));
return
pReq
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
132b22d4
...
...
@@ -509,7 +509,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
mndAddTaskToTaskSet
(
taskOneLevel
,
pTask
);
//
input
//
source
pTask
->
isDataScan
=
1
;
// trigger
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
132b22d4
...
...
@@ -150,6 +150,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
// tqExec
int32_t
tqDataExec
(
STQ
*
pTq
,
STqExecHandle
*
pExec
,
SSubmitReq
*
pReq
,
SMqDataBlkRsp
*
pRsp
,
int32_t
workerId
);
int32_t
tqScanSnapshot
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataBlkRsp
*
pRsp
,
int32_t
workerId
);
int32_t
tqSendPollRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqDataBlkRsp
*
pRsp
);
// tqMeta
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
132b22d4
...
...
@@ -227,19 +227,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch
=
atomic_val_compare_exchange_32
(
&
pHandle
->
epoch
,
consumerEpoch
,
reqEpoch
);
}
SWalHead
*
pHeadWithCkSum
=
taosMemoryMalloc
(
sizeof
(
SWalHead
)
+
2048
);
if
(
pHeadWithCkSum
==
NULL
)
{
return
-
1
;
}
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
if
(
rsp
.
blockData
==
NULL
||
rsp
.
blockDataLen
==
NULL
)
{
return
-
1
;
}
rsp
.
withTbName
=
pReq
->
withTbName
;
if
(
rsp
.
withTbName
)
{
rsp
.
blockTbName
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
...
...
@@ -253,6 +250,30 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp
.
blockSchema
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
}
#if 1
if
(
pReq
->
useSnapshot
)
{
tqInfo
(
"retrieve using snapshot"
);
int64_t
lastVer
=
walGetCommittedVer
(
pTq
->
pWal
);
if
(
rsp
.
reqOffset
<
lastVer
)
{
tqScanSnapshot
(
pTq
,
&
pHandle
->
execHandle
,
&
rsp
,
workerId
);
if
(
rsp
.
blockNum
!=
0
)
{
rsp
.
withTbName
=
false
;
rsp
.
rspOffset
=
lastVer
;
tqInfo
(
"direct send by snapshot rsp offset %ld"
,
lastVer
);
goto
SEND_RSP
;
}
}
}
#endif
SWalHead
*
pHeadWithCkSum
=
taosMemoryMalloc
(
sizeof
(
SWalHead
)
+
2048
);
if
(
pHeadWithCkSum
==
NULL
)
{
return
-
1
;
}
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
while
(
1
)
{
consumerEpoch
=
atomic_load_32
(
&
pHandle
->
epoch
);
if
(
consumerEpoch
>
reqEpoch
)
{
...
...
@@ -292,6 +313,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
metaRsp
.
metaRsp
=
pHead
->
body
;
if
(
tqSendMetaPollRsp
(
pTq
,
pMsg
,
pReq
,
&
metaRsp
)
<
0
)
{
code
=
-
1
;
goto
OVER
;
}
code
=
0
;
goto
OVER
;
...
...
@@ -308,6 +330,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosMemoryFree
(
pHeadWithCkSum
);
SEND_RSP:
ASSERT
(
taosArrayGetSize
(
rsp
.
blockData
)
==
rsp
.
blockNum
);
ASSERT
(
taosArrayGetSize
(
rsp
.
blockDataLen
)
==
rsp
.
blockNum
);
if
(
rsp
.
withSchema
)
{
...
...
@@ -376,6 +399,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SReadHandle
handle
=
{
.
reader
=
pHandle
->
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
initTsdbReader
=
1
,
};
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
);
ASSERT
(
pHandle
->
execHandle
.
execCol
.
task
[
i
]);
...
...
@@ -448,6 +473,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
.
reader
=
pStreamReader
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
initTsdbReader
=
1
,
};
/*pTask->exec.inputHandle = pStreamReader;*/
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
132b22d4
...
...
@@ -60,6 +60,30 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqD
return
0
;
}
int32_t
tqScanSnapshot
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataBlkRsp
*
pRsp
,
int32_t
workerId
)
{
ASSERT
(
pExec
->
subType
==
TOPIC_SUB_TYPE__COLUMN
);
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
workerId
];
if
(
qStreamScanSnapshot
(
task
)
<
0
)
{
ASSERT
(
0
);
}
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
}
if
(
pDataBlock
==
NULL
)
break
;
ASSERT
(
pDataBlock
->
info
.
rows
!=
0
);
ASSERT
(
pDataBlock
->
info
.
numOfCols
!=
0
);
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
);
pRsp
->
blockNum
++
;
}
return
0
;
}
int32_t
tqDataExec
(
STQ
*
pTq
,
STqExecHandle
*
pExec
,
SSubmitReq
*
pReq
,
SMqDataBlkRsp
*
pRsp
,
int32_t
workerId
)
{
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
workerId
];
...
...
source/libs/executor/src/executor.c
浏览文件 @
132b22d4
...
...
@@ -67,6 +67,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll
(
p
->
pDataBlock
,
pDataBlock
->
pDataBlock
);
taosArrayPush
(
pInfo
->
pBlockLists
,
&
p
);
}
}
else
if
(
type
==
STREAM_DATA_TYPE_FROM_SNAPSHOT
)
{
// do nothing
ASSERT
(
pInfo
->
blockType
==
STREAM_DATA_TYPE_FROM_SNAPSHOT
);
}
else
{
ASSERT
(
0
);
}
...
...
@@ -75,6 +78,14 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
}
int32_t
qStreamScanSnapshot
(
qTaskInfo_t
tinfo
)
{
if
(
tinfo
==
NULL
)
{
return
TSDB_CODE_QRY_APP_ERROR
;
}
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
return
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
NULL
,
0
,
STREAM_DATA_TYPE_FROM_SNAPSHOT
,
0
,
NULL
);
}
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
,
int32_t
type
,
bool
assignUid
)
{
return
qSetMultiStreamInput
(
tinfo
,
input
,
1
,
type
,
assignUid
);
}
...
...
@@ -106,14 +117,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return
NULL
;
}
// print those info into log
#if 0
pMsg->sId = pMsg->sId;
pMsg->queryId = pMsg->queryId;
pMsg->taskId = pMsg->taskId;
pMsg->contentLen = pMsg->contentLen;
#endif
/*qDebugL("stream task string %s", (const char*)msg);*/
struct
SSubplan
*
plan
=
NULL
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
132b22d4
...
...
@@ -4041,16 +4041,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
tsdbReaderT
pDataReader
=
NULL
;
if
(
pHandle
)
{
if
(
pHandle
->
vnode
)
{
// for stram
if
(
pHandle
->
initTsdbReader
)
{
// for stream
ASSERT
(
pHandle
->
vnode
);
pDataReader
=
doCreateDataReader
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
(
uint64_t
)
queryId
,
taskId
);
}
else
{
// for tq
ASSERT
(
pHandle
->
meta
);
getTableList
(
pHandle
->
meta
,
pScanPhyNode
,
pTableListInfo
);
}
}
if
(
pDataReader
==
NULL
&&
terrno
!=
0
)
{
qDebug
(
"%s pDataReader is NULL"
,
GET_TASKID
(
pTaskInfo
));
// return NULL;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
132b22d4
...
...
@@ -928,7 +928,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SSDataBlock
*
pBlock
=
taosArrayGetP
(
pInfo
->
pBlockLists
,
current
);
blockDataUpdateTsWindow
(
pBlock
,
0
);
return
pBlock
;
}
else
{
}
else
if
(
pInfo
->
blockType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_RES
)
{
blockDataDestroy
(
pInfo
->
pUpdateRes
);
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
...
...
@@ -1062,6 +1062,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
if
(
pInfo
->
blockType
==
STREAM_DATA_TYPE_FROM_SNAPSHOT
)
{
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pOperatorDumy
);
if
(
pResult
)
{
return
pResult
->
info
.
rows
>
0
?
pResult
:
NULL
;
}
return
NULL
;
}
else
{
ASSERT
(
0
);
return
NULL
;
}
}
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
132b22d4
...
...
@@ -80,7 +80,6 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
qRes
->
type
=
STREAM_INPUT__DATA_BLOCK
;
qRes
->
blocks
=
pRes
;
/*qRes->sourceVg = pTask->nodeId;*/
if
(
streamTaskOutput
(
pTask
,
qRes
)
<
0
)
{
streamQueueProcessFail
(
pTask
->
inputQueue
);
taosArrayDestroy
(
pRes
);
...
...
tests/script/jenkins/basic.txt
浏览文件 @
132b22d4
...
...
@@ -109,6 +109,7 @@
./test.sh -f tsim/tmq/basic4Of2Cons.sim
./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim
./test.sh -f tsim/tmq/topic.sim
./test.sh -f tsim/tmq/snapshot.sim
# --- stable
./test.sh -f tsim/stable/disk.sim
...
...
tests/script/tsim/tmq/consume.sh
浏览文件 @
132b22d4
...
...
@@ -17,8 +17,9 @@ VALGRIND=0
SIGNAL
=
SIGINT
SHOW_MSG
=
0
SHOW_ROW
=
0
EXP_USE_SNAPSHOT
=
0
while
getopts
"d:s:v:y:x:g:r:w:"
arg
while
getopts
"d:s:v:y:x:g:r:w:
e:
"
arg
do
case
$arg
in
d
)
...
...
@@ -45,6 +46,9 @@ do
w
)
CDB_NAME
=
$OPTARG
;;
e
)
EXP_USE_SNAPSHOT
=
$OPTARG
;;
?
)
echo
"unkown argument"
;;
...
...
@@ -91,8 +95,8 @@ if [ "$EXEC_OPTON" = "start" ]; then
echo nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
>
/dev/null 2>&1 &
nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
>
/dev/null 2>&1 &
else
echo
"nohup
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
-w
$CDB_NAME
> /dev/null 2>&1 &"
nohup
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
-w
$CDB_NAME
>
/dev/null 2>&1 &
echo
"nohup
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
-w
$CDB_NAME
-e
$EXP_USE_SNAPSHOT
> /dev/null 2>&1 &"
nohup
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
-w
$CDB_NAME
-e
$EXP_USE_SNAPSHOT
>
/dev/null 2>&1 &
fi
else
PID
=
`
ps
-ef
|grep tmq_sim |
grep
-v
grep
|
awk
'{print $2}'
`
...
...
tests/script/tsim/tmq/snapshot.sim
0 → 100644
浏览文件 @
132b22d4
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
$cdb_index = 0
#=============================== start consume =============================#
print ================ test consume from stb
$loop_cnt = 0
loop_consume_diff_topic_from_stb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = 1
$expectrowcnt = 100
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -e 1 -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -e 1 -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectrowcnt then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
print ================ test consume from ctb
$loop_cnt = 0
loop_consume_diff_topic_from_ctb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ctb_column
$topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != 1 then
return -1
endi
if $data[0][3] != 10 then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
print ================ test consume from ntb
$loop_cnt = 0
loop_consume_diff_topic_from_ntb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != 1 then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/test/c/tmqSim.c
浏览文件 @
132b22d4
...
...
@@ -22,9 +22,9 @@
#include <time.h>
#include "taos.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tlog.h"
#include "taosdef.h"
#include "types.h"
#define GREEN "\033[1;32m"
...
...
@@ -36,11 +36,7 @@
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32)
typedef
enum
{
NOTIFY_CMD_START_CONSUM
,
NOTIFY_CMD_START_COMMIT
,
NOTIFY_CMD_ID_BUTT
}
NOTIFY_CMD_ID
;
typedef
enum
{
NOTIFY_CMD_START_CONSUM
,
NOTIFY_CMD_START_COMMIT
,
NOTIFY_CMD_ID_BUTT
}
NOTIFY_CMD_ID
;
typedef
struct
{
TdThread
thread
;
...
...
@@ -52,8 +48,8 @@ typedef struct {
// char autoOffsetRest[16]; // none, earliest, latest
TdFilePtr
pConsumeRowsFile
;
int32_t
ifCheckData
;
int64_t
expectMsgCnt
;
int32_t
ifCheckData
;
int64_t
expectMsgCnt
;
int64_t
consumeMsgCnt
;
int64_t
consumeRowCnt
;
...
...
@@ -89,6 +85,7 @@ typedef struct {
int32_t
saveRowFlag
;
int32_t
consumeDelay
;
// unit s
int32_t
numOfThread
;
int32_t
useSnapshot
;
SThreadInfo
stThreads
[
MAX_CONSUMER_THREAD_CNT
];
}
SConfInfo
;
...
...
@@ -96,6 +93,8 @@ static SConfInfo g_stConfInfo;
TdFilePtr
g_fp
=
NULL
;
static
int
running
=
1
;
int8_t
useSnapshot
=
0
;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
...
...
@@ -205,6 +204,8 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo
.
saveRowFlag
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-y"
)
==
0
)
{
g_stConfInfo
.
consumeDelay
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-e"
)
==
0
)
{
useSnapshot
=
(
int8_t
)
atol
(
argv
[
++
i
]);
}
else
{
pError
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
...
...
@@ -298,11 +299,11 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
return
0
;
}
static
char
*
shellFormatTimestamp
(
char
*
buf
,
int64_t
val
,
int32_t
precision
)
{
//if (shell.args.is_raw_time) {
// sprintf(buf, "%" PRId64, val);
// return buf;
//}
static
char
*
shellFormatTimestamp
(
char
*
buf
,
int64_t
val
,
int32_t
precision
)
{
//
if (shell.args.is_raw_time) {
//
sprintf(buf, "%" PRId64, val);
//
return buf;
//
}
time_t
tt
;
int32_t
ms
=
0
;
...
...
@@ -340,7 +341,7 @@ static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
}
}
struct
tm
*
ptm
=
taosLocalTime
(
&
tt
,
NULL
);
struct
tm
*
ptm
=
taosLocalTime
(
&
tt
,
NULL
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
...
...
@@ -354,7 +355,8 @@ static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
return
buf
;
}
static
void
shellDumpFieldToFile
(
TdFilePtr
pFile
,
const
char
*
val
,
TAOS_FIELD
*
field
,
int32_t
length
,
int32_t
precision
)
{
static
void
shellDumpFieldToFile
(
TdFilePtr
pFile
,
const
char
*
val
,
TAOS_FIELD
*
field
,
int32_t
length
,
int32_t
precision
)
{
if
(
val
==
NULL
)
{
taosFprintfFile
(
pFile
,
"%s"
,
TSDB_DATA_NULL_STR
);
return
;
...
...
@@ -364,31 +366,31 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
char
buf
[
TSDB_MAX_BYTES_PER_ROW
];
switch
(
field
->
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
taosFprintfFile
(
pFile
,
"%d"
,
((((
int32_t
)(
*
((
char
*
)
val
)))
==
1
)
?
1
:
0
));
taosFprintfFile
(
pFile
,
"%d"
,
((((
int32_t
)(
*
((
char
*
)
val
)))
==
1
)
?
1
:
0
));
break
;
case
TSDB_DATA_TYPE_TINYINT
:
taosFprintfFile
(
pFile
,
"%d"
,
*
((
int8_t
*
)
val
));
taosFprintfFile
(
pFile
,
"%d"
,
*
((
int8_t
*
)
val
));
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
taosFprintfFile
(
pFile
,
"%u"
,
*
((
uint8_t
*
)
val
));
taosFprintfFile
(
pFile
,
"%u"
,
*
((
uint8_t
*
)
val
));
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
taosFprintfFile
(
pFile
,
"%d"
,
*
((
int16_t
*
)
val
));
taosFprintfFile
(
pFile
,
"%d"
,
*
((
int16_t
*
)
val
));
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
taosFprintfFile
(
pFile
,
"%u"
,
*
((
uint16_t
*
)
val
));
taosFprintfFile
(
pFile
,
"%u"
,
*
((
uint16_t
*
)
val
));
break
;
case
TSDB_DATA_TYPE_INT
:
taosFprintfFile
(
pFile
,
"%d"
,
*
((
int32_t
*
)
val
));
taosFprintfFile
(
pFile
,
"%d"
,
*
((
int32_t
*
)
val
));
break
;
case
TSDB_DATA_TYPE_UINT
:
taosFprintfFile
(
pFile
,
"%u"
,
*
((
uint32_t
*
)
val
));
taosFprintfFile
(
pFile
,
"%u"
,
*
((
uint32_t
*
)
val
));
break
;
case
TSDB_DATA_TYPE_BIGINT
:
taosFprintfFile
(
pFile
,
"%"
PRId64
,
*
((
int64_t
*
)
val
));
taosFprintfFile
(
pFile
,
"%"
PRId64
,
*
((
int64_t
*
)
val
));
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
taosFprintfFile
(
pFile
,
"%"
PRIu64
,
*
((
uint64_t
*
)
val
));
taosFprintfFile
(
pFile
,
"%"
PRIu64
,
*
((
uint64_t
*
)
val
));
break
;
case
TSDB_DATA_TYPE_FLOAT
:
taosFprintfFile
(
pFile
,
"%.5f"
,
GET_FLOAT_VAL
(
val
));
...
...
@@ -409,7 +411,7 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
taosFprintfFile
(
pFile
,
"
\'
%s
\'
"
,
buf
);
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
shellFormatTimestamp
(
buf
,
*
(
int64_t
*
)
val
,
precision
);
shellFormatTimestamp
(
buf
,
*
(
int64_t
*
)
val
,
precision
);
taosFprintfFile
(
pFile
,
"'%s'"
,
buf
);
break
;
default:
...
...
@@ -417,12 +419,13 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
}
}
static
void
dumpToFileForCheck
(
TdFilePtr
pFile
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int32_t
*
length
,
int32_t
num_fields
,
int32_t
precision
)
{
static
void
dumpToFileForCheck
(
TdFilePtr
pFile
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int32_t
*
length
,
int32_t
num_fields
,
int32_t
precision
)
{
for
(
int32_t
i
=
0
;
i
<
num_fields
;
i
++
)
{
if
(
i
>
0
)
{
taosFprintfFile
(
pFile
,
"
\n
"
);
}
shellDumpFieldToFile
(
pFile
,
(
const
char
*
)
row
[
i
],
fields
+
i
,
length
[
i
],
precision
);
shellDumpFieldToFile
(
pFile
,
(
const
char
*
)
row
[
i
],
fields
+
i
,
length
[
i
],
precision
);
}
taosFprintfFile
(
pFile
,
"
\n
"
);
}
...
...
@@ -432,31 +435,32 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t
totalRows
=
0
;
// printf("topic: %s\n", tmq_get_topic_name(msg));
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"dbName: %s, topic: %s, vgroupId: %d
\n
"
,
dbName
!=
NULL
?
dbName
:
"invalid table"
,
tmq_get_topic_name
(
msg
),
vgroupId
);
taosFprintfFile
(
g_fp
,
"dbName: %s, topic: %s, vgroupId: %d
\n
"
,
dbName
!=
NULL
?
dbName
:
"invalid table"
,
tmq_get_topic_name
(
msg
),
vgroupId
);
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
int32_t
*
length
=
taos_fetch_lengths
(
msg
);
int32_t
precision
=
taos_result_precision
(
msg
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
int32_t
*
length
=
taos_fetch_lengths
(
msg
);
int32_t
precision
=
taos_result_precision
(
msg
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
dumpToFileForCheck
(
pInfo
->
pConsumeRowsFile
,
row
,
fields
,
length
,
numOfFields
,
precision
);
dumpToFileForCheck
(
pInfo
->
pConsumeRowsFile
,
row
,
fields
,
length
,
numOfFields
,
precision
);
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
if
(
0
!=
g_stConfInfo
.
showRowFlag
)
{
taosFprintfFile
(
g_fp
,
"tbname:%s, rows[%d]: %s
\n
"
,
(
tbName
!=
NULL
?
tbName
:
"null table"
),
totalRows
,
buf
);
//if (0 != g_stConfInfo.saveRowFlag) {
// saveConsumeContentToTbl(pInfo, buf);
//}
//
if (0 != g_stConfInfo.saveRowFlag) {
//
saveConsumeContentToTbl(pInfo, buf);
//
}
}
totalRows
++
;
...
...
@@ -479,8 +483,7 @@ int queryDB(TAOS* taos, char* command) {
return
0
;
}
static
void
appNothing
(
void
*
param
,
TAOS_RES
*
res
,
int32_t
numOfRows
)
{
}
static
void
appNothing
(
void
*
param
,
TAOS_RES
*
res
,
int32_t
numOfRows
)
{}
int32_t
notifyMainScript
(
SThreadInfo
*
pInfo
,
int32_t
cmdId
)
{
char
sqlStr
[
1024
]
=
{
0
};
...
...
@@ -488,11 +491,8 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
int64_t
now
=
taosGetTimestampMs
();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf
(
sqlStr
,
"insert into %s.notifyinfo values (%"
PRId64
", %d, %d)"
,
g_stConfInfo
.
cdbName
,
now
,
cmdId
,
pInfo
->
consumerId
);
sprintf
(
sqlStr
,
"insert into %s.notifyinfo values (%"
PRId64
", %d, %d)"
,
g_stConfInfo
.
cdbName
,
now
,
cmdId
,
pInfo
->
consumerId
);
taos_query_a
(
pInfo
->
taos
,
sqlStr
,
appNothing
,
NULL
);
...
...
@@ -502,12 +502,12 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
}
static
int32_t
g_once_commit_flag
=
0
;
static
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
pError
(
"tmq_commit_cb_print() commit %d
\n
"
,
code
);
static
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
pError
(
"tmq_commit_cb_print() commit %d
\n
"
,
code
);
if
(
0
==
g_once_commit_flag
)
{
g_once_commit_flag
=
1
;
notifyMainScript
((
SThreadInfo
*
)
param
,
(
int32_t
)
NOTIFY_CMD_START_COMMIT
);
notifyMainScript
((
SThreadInfo
*
)
param
,
(
int32_t
)
NOTIFY_CMD_START_COMMIT
);
}
taosFprintfFile
(
g_fp
,
"tmq_commit_cb_print() be called
\n
"
);
}
...
...
@@ -541,6 +541,10 @@ void build_consumer(SThreadInfo* pInfo) {
// tmq_conf_set(conf, "auto.offset.reset", "none");
// tmq_conf_set(conf, "auto.offset.reset", "earliest");
// tmq_conf_set(conf, "auto.offset.reset", "latest");
//
if
(
useSnapshot
)
{
tmq_conf_set
(
conf
,
"experiment.use.snapshot"
,
"true"
);
}
pInfo
->
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
...
...
@@ -595,12 +599,13 @@ void loop_consume(SThreadInfo* pInfo) {
pInfo
->
consumerId
);
pInfo
->
ts
=
taosGetTimestampMs
();
if
(
pInfo
->
ifCheckData
)
{
char
filename
[
256
]
=
{
0
};
char
filename
[
256
]
=
{
0
};
char
tmpString
[
128
];
//sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, getCurrentTimeString(tmpString));
sprintf
(
filename
,
"%s/../log/consumerid_%d.txt"
,
configDir
,
pInfo
->
consumerId
);
// sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
// getCurrentTimeString(tmpString));
sprintf
(
filename
,
"%s/../log/consumerid_%d.txt"
,
configDir
,
pInfo
->
consumerId
);
pInfo
->
pConsumeRowsFile
=
taosOpenFile
(
filename
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
pInfo
->
pConsumeRowsFile
==
NULL
)
{
taosFprintfFile
(
g_fp
,
"%s create file fail for save rows content
\n
"
,
getCurrentTimeString
(
tmpString
));
...
...
@@ -619,10 +624,10 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs
++
;
if
(
0
==
once_flag
)
{
if
(
0
==
once_flag
)
{
once_flag
=
1
;
notifyMainScript
(
pInfo
,
NOTIFY_CMD_START_CONSUM
);
}
notifyMainScript
(
pInfo
,
NOTIFY_CMD_START_CONSUM
);
}
if
(
totalRows
>=
pInfo
->
expectMsgCnt
)
{
char
tmpString
[
128
];
...
...
@@ -651,7 +656,7 @@ void* consumeThreadFunc(void* param) {
pInfo
->
taos
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pInfo
->
taos
==
NULL
)
{
taosFprintfFile
(
g_fp
,
"taos_connect() fail, can not notify and save consume result to main scripte
\n
"
);
exit
(
-
1
);
exit
(
-
1
);
}
build_consumer
(
pInfo
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录