Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d8da0c0e
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d8da0c0e
编写于
6月 30, 2022
作者:
L
Liu Jicong
提交者:
GitHub
6月 30, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14386 from taosdata/feature/stream
feat(tmq): add snapshot test
上级
2ca6c5fa
94f07836
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
520 addition
and
67 deletion
+520
-67
examples/c/tmq.c
examples/c/tmq.c
+1
-1
include/libs/executor/executor.h
include/libs/executor/executor.h
+3
-1
source/client/src/tmq.c
source/client/src/tmq.c
+11
-5
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-0
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+12
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-13
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+26
-6
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+38
-13
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+11
-2
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+13
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+44
-4
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+34
-2
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
tests/script/tsim/tmq/snapshot1.sim
tests/script/tsim/tmq/snapshot1.sim
+310
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+13
-15
未找到文件。
examples/c/tmq.c
浏览文件 @
d8da0c0e
...
...
@@ -199,7 +199,7 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"experiment
.use.snapshot"
,
"fals
e"
);
tmq_conf_set
(
conf
,
"experiment
al.snapshot.enable"
,
"tru
e"
);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
...
...
include/libs/executor/executor.h
浏览文件 @
d8da0c0e
...
...
@@ -36,6 +36,7 @@ typedef struct SReadHandle {
void
*
vnode
;
void
*
mnd
;
SMsgCb
*
pMsgCb
;
bool
tqReader
;
}
SReadHandle
;
typedef
enum
{
...
...
@@ -133,7 +134,6 @@ int32_t qKillTask(qTaskInfo_t tinfo);
*/
int32_t
qAsyncKillTask
(
qTaskInfo_t
tinfo
);
/**
* destroy query info structure
* @param qHandle
...
...
@@ -172,6 +172,8 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
*/
int32_t
qGetStreamScanStatus
(
qTaskInfo_t
tinfo
,
uint64_t
*
uid
,
int64_t
*
ts
);
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
);
#ifdef __cplusplus
}
#endif
...
...
source/client/src/tmq.c
浏览文件 @
d8da0c0e
...
...
@@ -54,7 +54,8 @@ struct tmq_conf_t {
int8_t
autoCommit
;
int8_t
resetOffset
;
int8_t
withTbName
;
int8_t
useSnapshot
;
int8_t
spEnable
;
int32_t
spBatchSize
;
uint16_t
port
;
int32_t
autoCommitInterval
;
char
*
ip
;
...
...
@@ -288,18 +289,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if
(
strcmp
(
key
,
"experiment
.use.snapshot
"
)
==
0
)
{
if
(
strcmp
(
key
,
"experiment
al.snapshot.enable
"
)
==
0
)
{
if
(
strcmp
(
value
,
"true"
)
==
0
)
{
conf
->
useSnapshot
=
true
;
conf
->
spEnable
=
true
;
return
TMQ_CONF_OK
;
}
else
if
(
strcmp
(
value
,
"false"
)
==
0
)
{
conf
->
useSnapshot
=
false
;
conf
->
spEnable
=
false
;
return
TMQ_CONF_OK
;
}
else
{
return
TMQ_CONF_INVALID
;
}
}
if
(
strcmp
(
key
,
"experimental.snapshot.batch.size"
)
==
0
)
{
conf
->
spBatchSize
=
atoi
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"td.connect.ip"
)
==
0
)
{
conf
->
ip
=
strdup
(
value
);
return
TMQ_CONF_OK
;
...
...
@@ -918,7 +924,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
withTbName
=
conf
->
withTbName
;
pTmq
->
useSnapshot
=
conf
->
useSnapshot
;
pTmq
->
useSnapshot
=
conf
->
spEnable
;
pTmq
->
autoCommit
=
conf
->
autoCommit
;
pTmq
->
autoCommitInterval
=
conf
->
autoCommitInterval
;
pTmq
->
commitCb
=
conf
->
commitCb
;
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
d8da0c0e
...
...
@@ -116,6 +116,7 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
int32_t
tsdbSetTableId
(
tsdbReaderT
reader
,
int64_t
uid
);
int32_t
tsdbSetTableList
(
tsdbReaderT
reader
,
SArray
*
tableList
);
tsdbReaderT
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
SArray
*
tableList
,
uint64_t
qId
,
uint64_t
taskId
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
d8da0c0e
...
...
@@ -161,7 +161,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
// tqExec
int32_t
tqLogScanExec
(
STQ
*
pTq
,
STqExecHandle
*
pExec
,
SSubmitReq
*
pReq
,
SMqDataRsp
*
pRsp
,
int32_t
workerId
);
int32_t
tqScanSnapshot
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
int32_t
workerId
);
int32_t
tqScanSnapshot
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
offset
,
int32_t
workerId
);
int32_t
tqSendDataRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqDataRsp
*
pRsp
);
// tqMeta
...
...
@@ -183,6 +183,17 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore);
// tqSink
void
tqTableSink
(
SStreamTask
*
pTask
,
void
*
vnode
,
int64_t
ver
,
void
*
data
);
static
FORCE_INLINE
void
tqOffsetResetToData
(
STqOffsetVal
*
pOffsetVal
,
int64_t
uid
,
int64_t
ts
)
{
pOffsetVal
->
type
=
TMQ_OFFSET__SNAPSHOT_DATA
;
pOffsetVal
->
uid
=
uid
;
pOffsetVal
->
ts
=
ts
;
}
static
FORCE_INLINE
void
tqOffsetResetToLog
(
STqOffsetVal
*
pOffsetVal
,
int64_t
ver
)
{
pOffsetVal
->
type
=
TMQ_OFFSET__LOG
;
pOffsetVal
->
version
=
ver
;
}
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d8da0c0e
...
...
@@ -228,17 +228,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
static
int32_t
tqInitMetaRsp
(
SMqMetaRsp
*
pRsp
,
const
SMqPollReq
*
pReq
)
{
return
0
;
}
static
FORCE_INLINE
void
tqOffsetResetToData
(
STqOffsetVal
*
pOffsetVal
,
int64_t
uid
,
int64_t
ts
)
{
pOffsetVal
->
type
=
TMQ_OFFSET__SNAPSHOT_DATA
;
pOffsetVal
->
uid
=
uid
;
pOffsetVal
->
ts
=
ts
;
}
static
FORCE_INLINE
void
tqOffsetResetToLog
(
STqOffsetVal
*
pOffsetVal
,
int64_t
ver
)
{
pOffsetVal
->
type
=
TMQ_OFFSET__LOG
;
pOffsetVal
->
version
=
ver
;
}
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
)
{
SMqPollReq
*
pReq
=
pMsg
->
pCont
;
int64_t
consumerId
=
pReq
->
consumerId
;
...
...
@@ -394,13 +383,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
char
formatBuf
[
50
];
tFormatOffset
(
formatBuf
,
50
,
&
dataRsp
.
reqOffset
);
tqInfo
(
"retrieve using snapshot req offset %s"
,
formatBuf
);
if
(
tqScanSnapshot
(
pTq
,
&
pHandle
->
execHandle
,
&
dataRsp
,
workerId
)
<
0
)
{
if
(
tqScanSnapshot
(
pTq
,
&
pHandle
->
execHandle
,
&
dataRsp
,
fetchOffsetNew
,
workerId
)
<
0
)
{
ASSERT
(
0
);
}
// 4. send rsp
if
(
dataRsp
.
blockNum
!=
0
)
{
tqOffsetResetToData
(
&
dataRsp
.
rspOffset
,
0
,
0
);
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
code
=
-
1
;
}
...
...
@@ -655,6 +643,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.
reader
=
pHandle
->
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
tqReader
=
true
,
};
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
);
ASSERT
(
pHandle
->
execHandle
.
execCol
.
task
[
i
]);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
d8da0c0e
...
...
@@ -59,13 +59,19 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
return
0
;
}
int32_t
tqScanSnapshot
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
int32_t
workerId
)
{
int32_t
tqScanSnapshot
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
offset
,
int32_t
workerId
)
{
ASSERT
(
pExec
->
subType
==
TOPIC_SUB_TYPE__COLUMN
);
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
workerId
];
// TODO set uid and ts
if
(
qStreamScanSnapshot
(
task
)
<
0
)
{
/*if (qStreamScanSnapshot(task) < 0) {*/
/*ASSERT(0);*/
/*}*/
if
(
qStreamPrepareScan
(
task
,
offset
.
uid
,
offset
.
ts
)
<
0
)
{
ASSERT
(
0
);
}
int32_t
rowCnt
=
0
;
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
...
...
@@ -80,13 +86,27 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, i
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
);
if
(
pRsp
->
withTbName
)
{
// TODO
pRsp
->
withTbName
=
0
;
/*int64_t uid = 0;*/
/*tqAddTbNameToRsp(pTq, uid, pRsp, workerId);*/
#if 0
int64_t uid;
int64_t ts;
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0);
}
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
#endif
}
pRsp
->
blockNum
++
;
rowCnt
+=
pDataBlock
->
info
.
rows
;
if
(
rowCnt
>=
4096
)
break
;
}
int64_t
uid
;
int64_t
ts
;
if
(
qGetStreamScanStatus
(
task
,
&
uid
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
}
tqOffsetResetToData
(
&
pRsp
->
rspOffset
,
uid
,
ts
);
return
0
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
d8da0c0e
...
...
@@ -112,9 +112,9 @@ typedef struct STsdbReadHandle {
STimeWindow
window
;
// the primary query time window that applies to all queries
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
int32_t
numOfBlocks
;
int32_t
numOfBlocks
;
SSDataBlock
*
pResBlock
;
// SArray* pColumns; // column list, SColumnInfoData array list
// SArray* pColumns; // column list, SColumnInfoData array list
bool
locateStart
;
int32_t
outputCapacity
;
int32_t
realNumOfRows
;
...
...
@@ -223,6 +223,22 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
return
rows
;
}
static
SArray
*
createCheckInfoFromUid
(
STsdbReadHandle
*
pTsdbReadHandle
,
int64_t
uid
)
{
SArray
*
pTableCheckInfo
=
taosArrayInit
(
1
,
sizeof
(
STableCheckInfo
));
if
(
pTableCheckInfo
==
NULL
)
{
return
NULL
;
}
STableCheckInfo
info
=
{
.
tableId
=
uid
,
};
info
.
suid
=
pTsdbReadHandle
->
suid
;
taosArrayPush
(
pTableCheckInfo
,
&
info
);
tsdbDebug
(
"%p check table uid:%"
PRId64
" from lastKey:%"
PRId64
" %s"
,
pTsdbReadHandle
,
info
.
tableId
,
info
.
lastKey
,
pTsdbReadHandle
->
idStr
);
return
pTableCheckInfo
;
}
static
SArray
*
createCheckInfoFromTableGroup
(
STsdbReadHandle
*
pTsdbReadHandle
,
SArray
*
pTableList
)
{
size_t
tableSize
=
taosArrayGetSize
(
pTableList
);
...
...
@@ -428,8 +444,8 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{.
info
=
pCond
->
colList
[
i
],
0
};
int32_t
code
=
blockDataAppendColInfo
(
pReadHandle
->
pResBlock
,
&
colInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
int32_t
code
=
blockDataAppendColInfo
(
pReadHandle
->
pResBlock
,
&
colInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_end
;
}
}
...
...
@@ -494,9 +510,19 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbSetTable
List
(
tsdbReaderT
reader
,
SArray
*
tableList
)
{
int32_t
tsdbSetTable
Id
(
tsdbReaderT
reader
,
int64_t
uid
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
reader
;
if
(
pTsdbReadHandle
->
pTableCheckInfo
)
taosArrayDestroy
(
pTsdbReadHandle
->
pTableCheckInfo
);
if
(
pTsdbReadHandle
->
pTableCheckInfo
)
taosArrayDestroy
(
pTsdbReadHandle
->
pTableCheckInfo
);
pTsdbReadHandle
->
pTableCheckInfo
=
createCheckInfoFromUid
(
pTsdbReadHandle
,
uid
);
if
(
pTsdbReadHandle
->
pTableCheckInfo
==
NULL
)
{
return
TSDB_CODE_TDB_OUT_OF_MEMORY
;
}
return
TDB_CODE_SUCCESS
;
}
int32_t
tsdbSetTableList
(
tsdbReaderT
reader
,
SArray
*
tableList
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
reader
;
if
(
pTsdbReadHandle
->
pTableCheckInfo
)
taosArrayDestroy
(
pTsdbReadHandle
->
pTableCheckInfo
);
pTsdbReadHandle
->
pTableCheckInfo
=
createCheckInfoFromTableGroup
(
pTsdbReadHandle
,
tableList
);
if
(
pTsdbReadHandle
->
pTableCheckInfo
==
NULL
)
{
return
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -505,8 +531,8 @@ int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){
}
tsdbReaderT
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
SArray
*
tableList
,
uint64_t
qId
,
uint64_t
taskId
)
{
if
(
taosArrayGetSize
(
tableList
)
==
0
)
{
uint64_t
taskId
)
{
if
(
taosArrayGetSize
(
tableList
)
==
0
)
{
return
NULL
;
}
STsdbReadHandle
*
pTsdbReadHandle
=
tsdbQueryTablesImpl
(
pVnode
,
pCond
,
qId
,
taskId
);
...
...
@@ -553,8 +579,7 @@ tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* t
}
tsdbDebug
(
"%p total numOfTable:%"
PRIzu
" in this query, table %"
PRIzu
" %s"
,
pTsdbReadHandle
,
taosArrayGetSize
(
pTsdbReadHandle
->
pTableCheckInfo
),
taosArrayGetSize
(
tableList
),
pTsdbReadHandle
->
idStr
);
taosArrayGetSize
(
pTsdbReadHandle
->
pTableCheckInfo
),
taosArrayGetSize
(
tableList
),
pTsdbReadHandle
->
idStr
);
return
(
tsdbReaderT
)
pTsdbReadHandle
;
}
...
...
@@ -1073,7 +1098,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
}
int64_t
fid
=
(
int64_t
)(
key
/
(
daysPerFile
*
tsTickPerMin
[
precision
]));
// set the starting fileId
if
(
fid
<
0LL
&&
llabs
(
fid
)
>
INT32_MAX
)
{
// data value overflow for INT32
if
(
fid
<
0LL
&&
llabs
(
fid
)
>
INT32_MAX
)
{
// data value overflow for INT32
fid
=
INT32_MIN
;
}
...
...
@@ -2612,7 +2637,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
tsdbGetFidKeyRange
(
pCfg
->
days
,
pCfg
->
precision
,
pTsdbReadHandle
->
pFileGroup
->
fid
,
&
win
.
skey
,
&
win
.
ekey
);
// current file are not overlapped with query time window, ignore remain files
if
((
win
.
skey
>
pTsdbReadHandle
->
window
.
ekey
)
/* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/
)
{
if
((
win
.
skey
>
pTsdbReadHandle
->
window
.
ekey
)
/* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/
)
{
tsdbUnLockFS
(
REPO_FS
(
pTsdbReadHandle
->
pTsdb
));
tsdbDebug
(
"%p remain files are not qualified for qrange:%"
PRId64
"-%"
PRId64
", ignore, %s"
,
pTsdbReadHandle
,
pTsdbReadHandle
->
window
.
skey
,
pTsdbReadHandle
->
window
.
ekey
,
pTsdbReadHandle
->
idStr
);
...
...
@@ -2886,7 +2911,7 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
*/
int32_t
tsdbGetStbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
)
{
SMStbCursor
*
pCur
=
metaOpenStbCursor
(
pMeta
,
suid
);
if
(
!
pCur
)
{
if
(
!
pCur
)
{
return
TSDB_CODE_FAILED
;
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
d8da0c0e
...
...
@@ -248,6 +248,11 @@ typedef struct SSampleExecInfo {
uint32_t
seed
;
// random seed value
}
SSampleExecInfo
;
enum
{
TABLE_SCAN__TABLE_ORDER
=
1
,
TABLE_SCAN__BLOCK_ORDER
=
2
,
};
typedef
struct
STableScanInfo
{
void
*
dataReader
;
SReadHandle
readHandle
;
...
...
@@ -272,13 +277,16 @@ typedef struct STableScanInfo {
int32_t
curTWinIdx
;
int32_t
currentGroupId
;
int32_t
currentTable
;
uint64_t
queryId
;
// todo remove it
uint64_t
taskId
;
// todo remove it
struct
{
uint64_t
uid
;
int64_t
t
;
}
scanStatus
;
int64_t
ts
;
}
lastStatus
;
int8_t
scanMode
;
}
STableScanInfo
;
typedef
struct
STagScanInfo
{
...
...
@@ -713,6 +721,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
void
setTbNameColData
(
void
*
pMeta
,
const
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
,
int32_t
functionId
);
int32_t
doPrepareScan
(
SOperatorInfo
*
pOperator
,
uint64_t
uid
,
int64_t
ts
);
int32_t
doGetScanStatus
(
SOperatorInfo
*
pOperator
,
uint64_t
*
uid
,
int64_t
*
ts
);
SSDataBlock
*
loadNextDataBlock
(
void
*
param
);
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
d8da0c0e
...
...
@@ -222,7 +222,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
}
int32_t
qDeserializeTaskStatus
(
qTaskInfo_t
tinfo
,
const
char
*
pInput
,
int32_t
len
)
{
SExecTaskInfo
*
pTaskInfo
=
(
struct
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
struct
SExecTaskInfo
*
)
tinfo
;
if
(
pTaskInfo
==
NULL
||
pInput
==
NULL
||
len
==
0
)
{
return
TSDB_CODE_INVALID_PARA
;
...
...
@@ -231,11 +231,20 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
return
decodeOperator
(
pTaskInfo
->
pRoot
,
pInput
,
len
);
}
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
qGetStreamScanStatus
(
qTaskInfo_t
tinfo
,
uint64_t
*
uid
,
int64_t
*
ts
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
if
(
uid
==
0
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
0
);
uid
=
pTableInfo
->
uid
;
ts
=
INT64_MIN
;
}
return
TSDB_CODE_SUCCESS
;
return
doPrepareScan
(
pTaskInfo
->
pRoot
,
uid
,
ts
)
;
}
int32_t
qGetStreamScanStatus
(
qTaskInfo_t
tinfo
,
uint64_t
*
uid
,
int64_t
*
ts
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
return
doGetScanStatus
(
pTaskInfo
->
pRoot
,
uid
,
ts
);
}
source/libs/executor/src/executorimpl.c
浏览文件 @
d8da0c0e
...
...
@@ -1033,7 +1033,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
SqlFunctionCtx
*
pCtx
=
pTableScanInfo
->
pCtx
;
uint32_t
status
=
BLK_DATA_NOT_LOAD
;
int32_t
numOfOutput
=
0
;
//
pTableScanInfo->numOfOutput;
int32_t
numOfOutput
=
0
;
//
pTableScanInfo->numOfOutput;
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pCtx
[
i
].
functionId
;
int32_t
colId
=
pTableScanInfo
->
pExpr
[
i
].
base
.
pParam
[
0
].
pCol
->
colId
;
...
...
@@ -2821,13 +2821,53 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
}
int32_t
doPrepareScan
(
SOperatorInfo
*
pOperator
,
uint64_t
uid
,
int64_t
ts
)
{
int32_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamBlockScanInfo
*
pScanInfo
=
pOperator
->
info
;
pScanInfo
->
blockType
=
STREAM_INPUT__DATA_SCAN
;
STableScanInfo
*
pInfo
=
pScanInfo
->
pSnapshotReadOp
->
info
;
/*if (pSnapShotScanInfo->dataReader == NULL) {*/
/*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
/*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
/*}*/
if
(
pInfo
->
lastStatus
.
uid
!=
uid
||
pInfo
->
lastStatus
.
ts
!=
ts
)
{
tsdbSetTableId
(
pInfo
->
dataReader
,
uid
);
SQueryTableDataCond
tmpCond
=
pInfo
->
cond
;
tmpCond
.
twindows
[
0
]
=
(
STimeWindow
){
.
skey
=
ts
,
.
ekey
=
INT64_MAX
,
};
tsdbResetReadHandle
(
pInfo
->
dataReader
,
&
tmpCond
,
0
);
pInfo
->
scanTimes
=
0
;
pInfo
->
curTWinIdx
=
0
;
}
}
else
{
if
(
pOperator
->
numOfDownstream
==
1
)
{
return
doPrepareScan
(
pOperator
->
pDownstream
[
0
],
uid
,
ts
);
}
else
if
(
pOperator
->
numOfDownstream
==
0
)
{
qError
(
"failed to find stream scan operator to set the input data block"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
else
{
qError
(
"join not supported for stream block scan"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
doGetScanStatus
(
SOperatorInfo
*
pOperator
,
uint64_t
*
uid
,
int64_t
*
ts
)
{
int32_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamBlockScanInfo
*
pScanInfo
=
pOperator
->
info
;
STableScanInfo
*
pSnapShotScanInfo
=
pScanInfo
->
pSnapshotReadOp
->
info
;
*
uid
=
pSnapShotScanInfo
->
scan
Status
.
uid
;
*
ts
=
pSnapShotScanInfo
->
scanStatus
.
t
;
STableScanInfo
*
pSnapShotScanInfo
=
pScanInfo
->
pSnapshotReadOp
->
info
;
*
uid
=
pSnapShotScanInfo
->
last
Status
.
uid
;
*
ts
=
pSnapShotScanInfo
->
lastStatus
.
ts
;
}
else
{
if
(
pOperator
->
pDownstream
[
0
]
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
d8da0c0e
...
...
@@ -392,6 +392,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
binfo
.
capacity
=
binfo
.
rows
;
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
=
binfo
;
ASSERT
(
binfo
.
uid
!=
0
);
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlock
(
pOperator
,
pTableScanInfo
,
pBlock
,
&
status
);
...
...
@@ -416,9 +417,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pOperator
->
cost
.
totalCost
=
pTableScanInfo
->
readRecorder
.
elapsedTime
;
// todo refactor
pTableScanInfo
->
scan
Status
.
uid
=
pBlock
->
info
.
uid
;
pTableScanInfo
->
scanStatus
.
t
=
pBlock
->
info
.
window
.
ekey
;
pTableScanInfo
->
last
Status
.
uid
=
pBlock
->
info
.
uid
;
pTableScanInfo
->
lastStatus
.
ts
=
pBlock
->
info
.
window
.
ekey
;
ASSERT
(
pBlock
->
info
.
uid
!=
0
);
return
pBlock
;
}
return
NULL
;
...
...
@@ -438,6 +440,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
while
(
pTableScanInfo
->
curTWinIdx
<
pTableScanInfo
->
cond
.
numOfTWindows
)
{
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
);
if
(
p
!=
NULL
)
{
ASSERT
(
p
->
info
.
uid
!=
0
);
return
p
;
}
pTableScanInfo
->
curTWinIdx
+=
1
;
...
...
@@ -513,6 +516,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
// if scan table by table
if
(
pInfo
->
scanMode
==
TABLE_SCAN__TABLE_ORDER
)
{
// check status
while
(
1
)
{
SSDataBlock
*
result
=
doTableScanGroup
(
pOperator
);
if
(
result
)
{
return
result
;
}
// if no data, switch to next table and continue scan
pInfo
->
currentTable
++
;
if
(
pInfo
->
currentTable
>=
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
))
{
return
NULL
;
}
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
pInfo
->
currentTable
);
/*pTableInfo->uid */
tsdbSetTableId
(
pInfo
->
dataReader
,
pTableInfo
->
uid
);
tsdbResetReadHandle
(
pInfo
->
dataReader
,
&
pInfo
->
cond
,
0
);
pInfo
->
scanTimes
=
0
;
pInfo
->
curTWinIdx
=
0
;
}
}
if
(
pInfo
->
currentGroupId
==
-
1
)
{
pInfo
->
currentGroupId
++
;
if
(
pInfo
->
currentGroupId
>=
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pGroupList
))
{
...
...
@@ -1207,6 +1232,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if
(
pHandle
)
{
SOperatorInfo
*
pTableScanDummy
=
createTableScanOperatorInfo
(
pTableScanNode
,
pHandle
,
pTaskInfo
,
queryId
,
taskId
);
STableScanInfo
*
pSTInfo
=
(
STableScanInfo
*
)
pTableScanDummy
->
info
;
SArray
*
tableList
=
taosArrayGetP
(
pTaskInfo
->
tableqinfoList
.
pGroupList
,
0
);
if
(
pHandle
->
tqReader
)
{
pSTInfo
->
scanMode
=
TABLE_SCAN__TABLE_ORDER
;
pSTInfo
->
dataReader
=
tsdbReaderOpen
(
pHandle
->
vnode
,
&
pSTInfo
->
cond
,
tableList
,
0
,
0
);
}
if
(
pSTInfo
->
interval
.
interval
>
0
)
{
pInfo
->
pUpdateInfo
=
updateInfoInitP
(
&
pSTInfo
->
interval
,
pTwSup
->
waterMark
);
}
else
{
...
...
tests/script/jenkins/basic.txt
浏览文件 @
d8da0c0e
...
...
@@ -129,6 +129,7 @@
./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim
./test.sh -f tsim/tmq/topic.sim
./test.sh -f tsim/tmq/snapshot.sim
./test.sh -f tsim/tmq/snapshot1.sim
# --- stable
./test.sh -f tsim/stable/disk.sim
...
...
tests/script/tsim/tmq/snapshot1.sim
0 → 100644
浏览文件 @
d8da0c0e
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic2Of2ConsOverlap.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 2
#=============================== start consume =============================#
print ================ test consume from stb
print == overlap toipcs: topic_stb_column + topic_stb_all, topic_stb_function + topic_stb_all
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start -e 1
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start -e 1
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
# $data[0][3]/$data[1][3] should be between $totalMsgOfOneTopic and $totalMsgOfStb.
if $data[0][3] < $totalMsgOfOneTopic then
return -1
endi
if $data[0][3] > $totalMsgOfStb then
return -1
endi
if $data[1][3] < $totalMsgOfOneTopic then
return -1
endi
if $data[1][3] > $totalMsgOfStb then
return -1
endi
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
$sumOfRows = $data[0][3] + $data[1][3]
if $sumOfRows != $totalMsgCons then
print actual: $sumOfRows
print expect: $totalMsgCons
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == overlap toipcs: topic_ctb_column + topic_ctb_all, topic_ctb_function + topic_ctb_all
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ctb_function
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][3] == $totalMsgOfOneTopic then
if $data[1][3] == $totalMsgOfCtb then
goto check_ok_1
endi
elif $data[1][3] == $totalMsgOfOneTopic then
if $data[0][3] == $totalMsgOfCtb then
goto check_ok_1
endi
endi
return -1
check_ok_1:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == overlap toipcs: topic_ntb_column + topic_ntb_all, topic_ntb_function + topic_ntb_all
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ntb_function
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][3] == $totalMsgOfOneTopic then
if $data[1][3] == $totalMsgOfNtb then
goto check_ok_3
endi
elif $data[1][3] == $totalMsgOfOneTopic then
if $data[0][3] == $totalMsgOfNtb then
goto check_ok_3
endi
endi
return -1
check_ok_3:
sql select * from performance_schema.`consumers`
if $rows != 0 then
return -1
endi
#sql select * from performance_schema.`subscriptions`
#if $rows != 0 then
# return -1
#endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/test/c/tmqSim.c
浏览文件 @
d8da0c0e
...
...
@@ -124,15 +124,13 @@ char* getCurrentTimeString(char* timeString) {
return
timeString
;
}
static
void
tmqStop
(
int
signum
,
void
*
info
,
void
*
ctx
)
{
static
void
tmqStop
(
int
signum
,
void
*
info
,
void
*
ctx
)
{
running
=
0
;
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s tmqStop() receive stop signal[%d]
\n
"
,
getCurrentTimeString
(
tmpString
),
signum
);
taosFprintfFile
(
g_fp
,
"%s tmqStop() receive stop signal[%d]
\n
"
,
getCurrentTimeString
(
tmpString
),
signum
);
}
static
void
tmqSetSignalHandle
()
{
taosSetSignal
(
SIGINT
,
tmqStop
);
}
static
void
tmqSetSignalHandle
()
{
taosSetSignal
(
SIGINT
,
tmqStop
);
}
void
initLogFile
()
{
char
filename
[
256
];
...
...
@@ -463,16 +461,16 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t
precision
=
taos_result_precision
(
msg
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
#if 0
#if 0
// get schema
//============================== stub =================================================//
for (int32_t i = 0; i < numOfFields; i++) {
taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
}
//============================== stub =================================================//
#endif
#endif
dumpToFileForCheck
(
pInfo
->
pConsumeRowsFile
,
row
,
fields
,
length
,
numOfFields
,
precision
);
dumpToFileForCheck
(
pInfo
->
pConsumeRowsFile
,
row
,
fields
,
length
,
numOfFields
,
precision
);
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
...
...
@@ -529,7 +527,7 @@ static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
g_once_commit_flag
=
1
;
notifyMainScript
((
SThreadInfo
*
)
param
,
(
int32_t
)
NOTIFY_CMD_START_COMMIT
);
}
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s tmq_commit_cb_print() be called
\n
"
,
getCurrentTimeString
(
tmpString
));
}
...
...
@@ -565,7 +563,7 @@ void build_consumer(SThreadInfo* pInfo) {
// tmq_conf_set(conf, "auto.offset.reset", "latest");
//
if
(
g_stConfInfo
.
useSnapshot
)
{
tmq_conf_set
(
conf
,
"experiment
.use.snapshot
"
,
"true"
);
tmq_conf_set
(
conf
,
"experiment
al.snapshot.enable
"
,
"true"
);
}
pInfo
->
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
...
...
@@ -692,13 +690,13 @@ void* consumeThreadFunc(void* param) {
pInfo
->
taos
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pInfo
->
taos
==
NULL
)
{
taosFprintfFile
(
g_fp
,
"taos_connect() fail, can not notify and save consume result to main scripte
\n
"
);
return
NULL
;
return
NULL
;
}
build_consumer
(
pInfo
);
build_topic_list
(
pInfo
);
if
((
NULL
==
pInfo
->
tmq
)
||
(
NULL
==
pInfo
->
topicList
))
{
taosFprintfFile
(
g_fp
,
"create consumer fail! tmq is null or topicList is null
\n
"
);
taosFprintfFile
(
g_fp
,
"create consumer fail! tmq is null or topicList is null
\n
"
);
assert
(
0
);
return
NULL
;
}
...
...
@@ -706,7 +704,7 @@ void* consumeThreadFunc(void* param) {
int32_t
err
=
tmq_subscribe
(
pInfo
->
tmq
,
pInfo
->
topicList
);
if
(
err
!=
0
)
{
pError
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_subscribe() fail! reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_subscribe() fail! reason: %s
\n
"
,
tmq_err2str
(
err
));
assert
(
0
);
return
NULL
;
}
...
...
@@ -727,13 +725,13 @@ void* consumeThreadFunc(void* param) {
err
=
tmq_unsubscribe
(
pInfo
->
tmq
);
if
(
err
!=
0
)
{
pError
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_unsubscribe()! reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_unsubscribe()! reason: %s
\n
"
,
tmq_err2str
(
err
));
}
err
=
tmq_consumer_close
(
pInfo
->
tmq
);
if
(
err
!=
0
)
{
pError
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_consumer_close()! reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_consumer_close()! reason: %s
\n
"
,
tmq_err2str
(
err
));
}
pInfo
->
tmq
=
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录