Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
514711e5
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
514711e5
编写于
8月 23, 2022
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat:get data from snapshot for taosx
上级
11343195
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
734 addition
and
48 deletion
+734
-48
examples/c/tmq_taosx.c
examples/c/tmq_taosx.c
+5
-5
source/client/src/taosx.c
source/client/src/taosx.c
+51
-36
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+2
-0
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+4
-0
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+5
-5
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+4
-2
tests/system-test/7-tmq/tmq_taosx.py
tests/system-test/7-tmq/tmq_taosx.py
+143
-0
tests/test/c/CMakeLists.txt
tests/test/c/CMakeLists.txt
+8
-0
tests/test/c/tmq_taosx_snapshot_ci.c
tests/test/c/tmq_taosx_snapshot_ci.c
+512
-0
未找到文件。
examples/c/tmq_taosx.c
浏览文件 @
514711e5
...
...
@@ -23,7 +23,7 @@
static
int
running
=
1
;
static
TAOS
*
use_db
(){
TAOS
*
pConn
=
taos_connect
(
"
192.168.1.86
"
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS
*
pConn
=
taos_connect
(
"
localhost
"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
NULL
;
}
...
...
@@ -84,7 +84,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_taosx vgroups
1
"
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_taosx vgroups
4
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -98,7 +98,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups
1
"
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups
3
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -386,7 +386,7 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"
tru
e"
);
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"
fals
e"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
...
...
@@ -415,7 +415,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
}
int32_t
cnt
=
0
;
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
-
1
);
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
...
...
source/client/src/taosx.c
浏览文件 @
514711e5
...
...
@@ -780,7 +780,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
char
*
tName
=
taosArrayGet
(
pCreateReq
->
ctb
.
tagName
,
i
);
for
(
int32_t
j
=
pTableMeta
->
tableInfo
.
numOfColumns
;
j
<
pTableMeta
->
tableInfo
.
numOfColumns
+
pTableMeta
->
tableInfo
.
numOfTags
;
j
++
){
SSchema
*
tag
=
&
pTableMeta
->
schema
[
j
];
if
(
strcmp
(
tag
->
name
,
tName
)
==
0
){
if
(
strcmp
(
tag
->
name
,
tName
)
==
0
&&
tag
->
type
!=
TSDB_DATA_TYPE_JSON
){
tTagSetCid
((
STag
*
)
pCreateReq
->
ctb
.
pTag
,
i
,
tag
->
colId
);
}
}
...
...
@@ -1328,6 +1328,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
SQuery
*
pQuery
=
NULL
;
SMqRspObj
rspObj
=
{
0
};
SDecoder
decoder
=
{
0
};
STableMeta
*
pTableMeta
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
...
...
@@ -1384,24 +1385,6 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
goto
end
;
}
uint16_t
fLen
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
for
(
int
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
SSchema
*
schema
=
pSW
->
pSchema
+
i
;
fLen
+=
TYPE_BYTES
[
schema
->
type
];
rowSize
+=
schema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
))
{
nVar
++
;
}
}
int32_t
rows
=
rspObj
.
resInfo
.
numOfRows
;
int32_t
extendedRowSize
=
rowSize
+
TD_ROW_HEAD_LEN
-
sizeof
(
TSKEY
)
+
nVar
*
sizeof
(
VarDataOffsetT
)
+
(
int32_t
)
TD_BITMAP_BYTES
(
pSW
->
nCols
-
1
);
int32_t
schemaLen
=
0
;
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
const
char
*
tbName
=
(
const
char
*
)
taosArrayGetP
(
rspObj
.
rsp
.
blockTbName
,
rspObj
.
resIter
);
if
(
!
tbName
)
{
uError
(
"WriteRaw: tbname is null"
);
...
...
@@ -1421,6 +1404,30 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
goto
end
;
}
code
=
catalogGetTableMeta
(
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw:catalogGetTableMeta failed. table name: %s"
,
tbName
);
goto
end
;
}
uint16_t
fLen
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
for
(
int
i
=
0
;
i
<
pTableMeta
->
tableInfo
.
numOfColumns
;
i
++
)
{
SSchema
*
schema
=
&
pTableMeta
->
schema
[
i
];
fLen
+=
TYPE_BYTES
[
schema
->
type
];
rowSize
+=
schema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
))
{
nVar
++
;
}
}
int32_t
rows
=
rspObj
.
resInfo
.
numOfRows
;
int32_t
extendedRowSize
=
rowSize
+
TD_ROW_HEAD_LEN
-
sizeof
(
TSKEY
)
+
nVar
*
sizeof
(
VarDataOffsetT
)
+
(
int32_t
)
TD_BITMAP_BYTES
(
pTableMeta
->
tableInfo
.
numOfColumns
-
1
);
int32_t
schemaLen
=
0
;
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
SSubmitReq
*
subReq
=
NULL
;
SSubmitBlk
*
blk
=
NULL
;
void
*
hData
=
taosHashGet
(
pVgHash
,
&
vgData
.
vg
.
vgId
,
sizeof
(
vgData
.
vg
.
vgId
));
...
...
@@ -1453,27 +1460,26 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
blk
=
POINTER_SHIFT
(
vgData
.
data
,
sizeof
(
SSubmitReq
));
}
STableMeta
*
pTableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw:catalogGetTableMeta failed. table name: %s"
,
tbName
);
goto
end
;
}
// pSW->pSchema should be same as pTableMeta->schema
ASSERT
(
pSW
->
nCols
==
pTableMeta
->
tableInfo
.
numOfColumns
);
//
ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
uint64_t
suid
=
(
TSDB_NORMAL_TABLE
==
pTableMeta
->
tableType
?
0
:
pTableMeta
->
suid
);
uint64_t
uid
=
pTableMeta
->
uid
;
int16_t
sver
=
pTableMeta
->
sversion
;
taosMemoryFreeClear
(
pTableMeta
);
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
SRowBuilder
rb
=
{
0
};
tdSRowInit
(
&
rb
,
sver
);
tdSRowSetTpInfo
(
&
rb
,
p
SW
->
nCol
s
,
fLen
);
tdSRowSetTpInfo
(
&
rb
,
p
TableMeta
->
tableInfo
.
numOfColumn
s
,
fLen
);
int32_t
dataLen
=
0
;
SHashObj
*
schemaHash
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
for
(
int
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
SSchema
*
schema
=
&
pSW
->
pSchema
[
i
];
taosHashPut
(
schemaHash
,
schema
->
name
,
strlen
(
schema
->
name
),
&
i
,
sizeof
(
int32_t
));
}
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
tdSRowResetBuf
(
&
rb
,
rowData
);
...
...
@@ -1481,17 +1487,23 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
rspObj
.
resInfo
.
current
+=
1
;
int32_t
offset
=
0
;
for
(
int32_t
k
=
0
;
k
<
p
SW
->
nCol
s
;
k
++
)
{
const
SSchema
*
pColumn
=
&
p
SW
->
pS
chema
[
k
];
char
*
data
=
rspObj
.
resInfo
.
row
[
k
]
;
if
(
!
data
)
{
for
(
int32_t
k
=
0
;
k
<
p
TableMeta
->
tableInfo
.
numOfColumn
s
;
k
++
)
{
const
SSchema
*
pColumn
=
&
p
TableMeta
->
s
chema
[
k
];
int32_t
*
index
=
taosHashGet
(
schemaHash
,
pColumn
->
name
,
strlen
(
pColumn
->
name
))
;
if
(
!
index
)
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
else
{
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
))
{
data
-=
VARSTR_HEADER_SIZE
;
}
else
{
char
*
colData
=
rspObj
.
resInfo
.
row
[
*
index
];
if
(
!
colData
)
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
else
{
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
))
{
colData
-=
VARSTR_HEADER_SIZE
;
}
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
colData
,
true
,
offset
,
k
);
}
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
offset
,
k
);
}
offset
+=
TYPE_BYTES
[
pColumn
->
type
];
}
tdSRowEnd
(
&
rb
);
...
...
@@ -1500,6 +1512,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
dataLen
+=
rowLen
;
}
taosHashCleanup
(
schemaHash
);
blk
->
uid
=
htobe64
(
uid
);
blk
->
suid
=
htobe64
(
suid
);
blk
->
sversion
=
htonl
(
sver
);
...
...
@@ -1508,6 +1521,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
blk
->
dataLen
=
htonl
(
dataLen
);
subReq
->
length
+=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
dataLen
;
subReq
->
numOfBlocks
++
;
taosMemoryFreeClear
(
pTableMeta
);
}
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
...
...
@@ -1561,6 +1575,7 @@ end:
qDestroyQuery
(
pQuery
);
destroyRequest
(
pRequest
);
taosHashCleanup
(
pVgHash
);
taosMemoryFreeClear
(
pTableMeta
);
return
code
;
}
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
514711e5
...
...
@@ -888,6 +888,7 @@ const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
return
NULL
;
}
#ifdef TAG_FILTER_DEBUG
if
(
IS_VAR_DATA_TYPE
(
val
->
type
))
{
char
*
buf
=
taosMemoryCalloc
(
val
->
nData
+
1
,
1
);
memcpy
(
buf
,
val
->
pData
,
val
->
nData
);
...
...
@@ -915,6 +916,7 @@ const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
metaDebug
(
"metaTag table number index:%d cid:%d type:%d value:%f"
,
i
,
pTagVal
->
cid
,
pTagVal
->
type
,
dval
);
}
}
#endif
return
val
;
}
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
514711e5
...
...
@@ -99,6 +99,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
memcpy
(
val
,
(
uint16_t
*
)
&
len
,
VARSTR_HEADER_SIZE
);
type
=
TSDB_DATA_TYPE_VARCHAR
;
term
=
indexTermCreate
(
suid
,
ADD_VALUE
,
type
,
key
,
nKey
,
val
,
len
);
taosMemoryFree
(
val
);
}
else
if
(
pTagVal
->
nData
==
0
)
{
term
=
indexTermCreate
(
suid
,
ADD_VALUE
,
TSDB_DATA_TYPE_VARCHAR
,
key
,
nKey
,
pTagVal
->
pData
,
0
);
}
...
...
@@ -115,6 +116,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
indexMultiTermAdd
(
terms
,
term
);
}
}
taosArrayDestroy
(
pTagVals
);
indexJsonPut
(
pMeta
->
pTagIvtIdx
,
terms
,
tuid
);
indexMultiTermDestroy
(
terms
);
#endif
...
...
@@ -413,6 +415,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
me
.
ctbEntry
.
suid
=
pReq
->
ctb
.
suid
;
me
.
ctbEntry
.
pTags
=
pReq
->
ctb
.
pTag
;
#ifdef TAG_FILTER_DEBUG
SArray
*
pTagVals
=
NULL
;
int32_t
code
=
tTagToValArray
((
STag
*
)
pReq
->
ctb
.
pTag
,
&
pTagVals
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagVals
);
i
++
)
{
...
...
@@ -429,6 +432,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
metaDebug
(
"metaTag table:%s number index:%d cid:%d type:%d value:%f"
,
pReq
->
name
,
i
,
pTagVal
->
cid
,
pTagVal
->
type
,
val
);
}
}
#endif
++
pMeta
->
pVnode
->
config
.
vndStats
.
numOfCTables
;
}
else
{
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
514711e5
...
...
@@ -120,15 +120,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
}
}
if
(
p
Rsp
->
blockNum
==
0
&&
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
){
if
(
p
DataBlock
==
NULL
&&
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
){
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
&&
qStreamExtractPrepareUid
(
task
)
!=
0
){
continue
;
}
tqDebug
(
"tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %"
PRId64
,
TD_VID
(
pTq
->
pVnode
),
pHandle
->
snapshotVer
+
1
);
tqOffsetResetToLog
(
pOffset
,
pHandle
->
snapshotVer
);
qStreamPrepareScan
(
task
,
pOffset
,
pHandle
->
execHandle
.
subType
);
continue
;
// tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
// qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
}
if
(
pRsp
->
blockNum
>
0
){
...
...
@@ -139,7 +138,8 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
SMqMetaRsp
*
tmp
=
qStreamExtractMetaMsg
(
task
);
if
(
tmp
->
rspOffset
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
){
qStreamPrepareScan
(
task
,
&
tmp
->
rspOffset
,
pHandle
->
execHandle
.
subType
);
tqOffsetResetToData
(
pOffset
,
tmp
->
rspOffset
.
uid
,
tmp
->
rspOffset
.
ts
);
qStreamPrepareScan
(
task
,
pOffset
,
pHandle
->
execHandle
.
subType
);
tmp
->
rspOffset
.
type
=
TMQ_OFFSET__SNAPSHOT_META
;
tqDebug
(
"tmqsnap task exec change to get data"
);
continue
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
514711e5
...
...
@@ -1517,7 +1517,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
pTaskInfo
->
streamInfo
.
metaRsp
.
metaRspLen
=
0
;
// use metaRspLen !=0 to judge if data is meta
pTaskInfo
->
streamInfo
.
metaRsp
.
metaRsp
=
NULL
;
qDebug
(
"doRawScan called"
);
qDebug
(
"
tmqsnap
doRawScan called"
);
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
){
SSDataBlock
*
pBlock
=
&
pInfo
->
pRes
;
...
...
@@ -1548,6 +1548,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if
(
mtInfo
.
uid
==
0
){
//read snapshot done, change to get data from wal
qDebug
(
"tmqsnap read snapshot done, change to get data from wal"
);
pTaskInfo
->
streamInfo
.
prepareStatus
.
uid
=
mtInfo
.
uid
;
pTaskInfo
->
streamInfo
.
lastStatus
.
type
=
TMQ_OFFSET__LOG
;
pTaskInfo
->
streamInfo
.
lastStatus
.
version
=
pInfo
->
sContext
->
snapVersion
;
tDeleteSSchemaWrapper
(
pTaskInfo
->
streamInfo
.
schema
);
}
else
{
pTaskInfo
->
streamInfo
.
prepareStatus
.
uid
=
mtInfo
.
uid
;
...
...
@@ -1595,7 +1597,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if
(
pInfo
->
needFetchLog
){
fetchVer
++
;
if
(
tqFetchLog
(
pInfo
->
tqReader
->
pWalReader
,
pInfo
->
sContext
->
withMeta
,
&
fetchVer
,
&
pInfo
->
pCkHead
)
<
0
)
{
qDebug
(
"tmq poll: consumer log end. offset %"
PRId64
,
fetchVer
);
qDebug
(
"tmq
snap tmq
poll: consumer log end. offset %"
PRId64
,
fetchVer
);
pTaskInfo
->
streamInfo
.
metaRsp
.
rspOffset
.
version
=
fetchVer
;
pTaskInfo
->
streamInfo
.
metaRsp
.
rspOffset
.
type
=
TMQ_OFFSET__LOG
;
return
NULL
;
...
...
tests/system-test/7-tmq/tmq_taosx.py
浏览文件 @
514711e5
...
...
@@ -52,6 +52,148 @@ class TDTestCase:
tdSql
.
checkData
(
1
,
1
,
23
)
tdSql
.
checkData
(
1
,
4
,
None
)
tdSql
.
query
(
"select * from st1 order by ts"
)
tdSql
.
checkRows
(
8
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
3
)
tdSql
.
checkData
(
4
,
1
,
4
)
tdSql
.
checkData
(
6
,
1
,
23
)
tdSql
.
checkData
(
0
,
2
,
2
)
tdSql
.
checkData
(
1
,
2
,
4
)
tdSql
.
checkData
(
4
,
2
,
3
)
tdSql
.
checkData
(
6
,
2
,
32
)
tdSql
.
checkData
(
0
,
3
,
'a'
)
tdSql
.
checkData
(
1
,
3
,
'b'
)
tdSql
.
checkData
(
4
,
3
,
'hwj'
)
tdSql
.
checkData
(
6
,
3
,
's21ds'
)
tdSql
.
checkData
(
0
,
4
,
None
)
tdSql
.
checkData
(
1
,
4
,
None
)
tdSql
.
checkData
(
5
,
4
,
940
)
tdSql
.
checkData
(
6
,
4
,
None
)
tdSql
.
checkData
(
0
,
5
,
1000
)
tdSql
.
checkData
(
1
,
5
,
2000
)
tdSql
.
checkData
(
4
,
5
,
1000
)
tdSql
.
checkData
(
6
,
5
,
5000
)
tdSql
.
checkData
(
0
,
6
,
'ttt'
)
tdSql
.
checkData
(
1
,
6
,
None
)
tdSql
.
checkData
(
4
,
6
,
'ttt'
)
tdSql
.
checkData
(
6
,
6
,
None
)
tdSql
.
checkData
(
0
,
7
,
True
)
tdSql
.
checkData
(
1
,
7
,
None
)
tdSql
.
checkData
(
4
,
7
,
True
)
tdSql
.
checkData
(
6
,
7
,
None
)
tdSql
.
checkData
(
0
,
8
,
None
)
tdSql
.
checkData
(
1
,
8
,
None
)
tdSql
.
checkData
(
4
,
8
,
None
)
tdSql
.
checkData
(
6
,
8
,
None
)
tdSql
.
query
(
"select * from ct1"
)
tdSql
.
checkRows
(
4
)
tdSql
.
query
(
"select * from ct2"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select * from ct0 order by c1"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
3
,
"a"
)
tdSql
.
checkData
(
1
,
4
,
None
)
tdSql
.
query
(
"select * from n1 order by cc3 desc"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
"eeee"
)
tdSql
.
checkData
(
1
,
2
,
940
)
tdSql
.
query
(
"select * from jt order by i desc"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
11
)
tdSql
.
checkData
(
0
,
2
,
None
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
1
,
2
,
'{"k1":1,"k2":"hello"}'
)
tdSql
.
execute
(
'drop topic if exists topic_ctb_column'
)
return
def
checkFileContentSnapshot
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_snapshot_ci -c %s'
%
(
buildPath
,
cfgPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
srcFile
=
'%s/../log/tmq_taosx_tmp_snapshot.source'
%
(
cfgPath
)
dstFile
=
'%s/../log/tmq_taosx_tmp_snapshot.result'
%
(
cfgPath
)
tdLog
.
info
(
"compare file: %s, %s"
%
(
srcFile
,
dstFile
))
consumeFile
=
open
(
srcFile
,
mode
=
'r'
)
queryFile
=
open
(
dstFile
,
mode
=
'r'
)
while
True
:
dst
=
queryFile
.
readline
()
src
=
consumeFile
.
readline
()
if
dst
:
if
dst
!=
src
:
tdLog
.
exit
(
"compare error: %s != %s"
%
src
,
dst
)
else
:
break
tdSql
.
execute
(
'use db_taosx'
)
tdSql
.
query
(
"select * from ct3 order by c1 desc"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
51
)
tdSql
.
checkData
(
0
,
4
,
940
)
tdSql
.
checkData
(
1
,
1
,
23
)
tdSql
.
checkData
(
1
,
4
,
None
)
tdSql
.
query
(
"select * from st1 order by ts"
)
tdSql
.
checkRows
(
8
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
3
)
tdSql
.
checkData
(
4
,
1
,
4
)
tdSql
.
checkData
(
6
,
1
,
23
)
tdSql
.
checkData
(
0
,
2
,
2
)
tdSql
.
checkData
(
1
,
2
,
4
)
tdSql
.
checkData
(
4
,
2
,
3
)
tdSql
.
checkData
(
6
,
2
,
32
)
tdSql
.
checkData
(
0
,
3
,
'a'
)
tdSql
.
checkData
(
1
,
3
,
'b'
)
tdSql
.
checkData
(
4
,
3
,
'hwj'
)
tdSql
.
checkData
(
6
,
3
,
's21ds'
)
tdSql
.
checkData
(
0
,
4
,
None
)
tdSql
.
checkData
(
1
,
4
,
None
)
tdSql
.
checkData
(
5
,
4
,
940
)
tdSql
.
checkData
(
6
,
4
,
None
)
tdSql
.
checkData
(
0
,
5
,
1000
)
tdSql
.
checkData
(
1
,
5
,
2000
)
tdSql
.
checkData
(
4
,
5
,
1000
)
tdSql
.
checkData
(
6
,
5
,
5000
)
tdSql
.
checkData
(
0
,
6
,
'ttt'
)
tdSql
.
checkData
(
1
,
6
,
None
)
tdSql
.
checkData
(
4
,
6
,
'ttt'
)
tdSql
.
checkData
(
6
,
6
,
None
)
tdSql
.
checkData
(
0
,
7
,
True
)
tdSql
.
checkData
(
1
,
7
,
None
)
tdSql
.
checkData
(
4
,
7
,
True
)
tdSql
.
checkData
(
6
,
7
,
None
)
tdSql
.
checkData
(
0
,
8
,
None
)
tdSql
.
checkData
(
1
,
8
,
None
)
tdSql
.
checkData
(
4
,
8
,
None
)
tdSql
.
checkData
(
6
,
8
,
None
)
tdSql
.
query
(
"select * from ct1"
)
tdSql
.
checkRows
(
4
)
...
...
@@ -80,6 +222,7 @@ class TDTestCase:
def
run
(
self
):
tdSql
.
prepare
()
self
.
checkFileContent
()
self
.
checkFileContentSnapshot
()
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/test/c/CMakeLists.txt
浏览文件 @
514711e5
...
...
@@ -2,6 +2,7 @@ add_executable(tmq_demo tmqDemo.c)
add_executable
(
tmq_sim tmqSim.c
)
add_executable
(
create_table createTable.c
)
add_executable
(
tmq_taosx_ci tmq_taosx_ci.c
)
add_executable
(
tmq_taosx_snapshot_ci tmq_taosx_snapshot_ci.c
)
add_executable
(
sml_test sml_test.c
)
target_link_libraries
(
create_table
...
...
@@ -31,6 +32,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries
(
tmq_taosx_snapshot_ci
PUBLIC taos_static
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries
(
sml_test
...
...
tests/test/c/tmq_taosx_snapshot_ci.c
0 → 100644
浏览文件 @
514711e5
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
static
int
running
=
1
;
TdFilePtr
g_fp
=
NULL
;
char
dir
[
64
]
=
{
0
};
static
TAOS
*
use_db
(){
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
NULL
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
NULL
;
}
taos_free_result
(
pRes
);
return
pConn
;
}
static
void
msg_process
(
TAOS_RES
*
msg
)
{
/*memset(buf, 0, 1024);*/
printf
(
"-----------topic-------------: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
TAOS
*
pConn
=
use_db
();
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
char
*
result
=
tmq_get_json_meta
(
msg
);
if
(
result
)
{
printf
(
"meta result: %s
\n
"
,
result
);
}
taosFprintfFile
(
g_fp
,
result
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
tmq_free_json_meta
(
result
);
}
tmq_raw_data
raw
=
{
0
};
tmq_get_raw
(
msg
,
&
raw
);
int32_t
ret
=
tmq_write_raw
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close
(
pConn
);
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop database if exists db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_taosx vgroups 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop database if exists abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct0 using st1 tags(1000,
\"
ttt
\"
, true)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(1626006833600, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1(t1) tags(2000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct2 using st1(t1) tags(NULL)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(1626006833600, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1(t1) tags(3000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add column c4 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 modify column c3 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 select * from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add tag t2 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table ct3 set tag t1=5000"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to slter child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"delete from abc1 .ct3 where ts < 1626006833606"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 add column c3 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 modify column c2 nchar(8)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 rename column c3 cc3"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 comment 'hello'"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 drop column c1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt(ts timestamp, i int) tags(t json)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt1 using jt tags('{
\"
k1
\"
:1,
\"
k2
\"
:
\"
hello
\"
}')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt2 using jt tags('')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into jt1 values(now, 1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into jt2 values(now, 11)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
int32_t
create_topic
()
{
printf
(
"create topic
\n
"
);
TAOS_RES
*
pRes
;
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column with meta as database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
printf
(
"commit %d tmq %p param %p
\n
"
,
code
,
tmq
,
param
);
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"client.id"
,
"my app 1"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.heartbeat.background"
,
"true"
);
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"topic_ctb_column"
);
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
cnt
=
0
;
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result
(
tmqmessage
);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
else
{
break
;
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1
;
int
msg_count
=
0
;
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
tmq_list_t
*
subList
=
NULL
;
tmq_subscription
(
tmq
,
&
subList
);
char
**
subTopics
=
tmq_list_to_c_array
(
subList
);
int32_t
sz
=
tmq_list_get_size
(
subList
);
printf
(
"subscribed topics: "
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
printf
(
"%s, "
,
subTopics
[
i
]);
}
printf
(
"
\n
"
);
tmq_list_destroy
(
subList
);
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
taos_free_result
(
tmqmessage
);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
initLogFile
()
{
char
f1
[
256
]
=
{
0
};
char
f2
[
256
]
=
{
0
};
sprintf
(
f1
,
"%s/../log/tmq_taosx_tmp_snapshot.source"
,
dir
);
sprintf
(
f2
,
"%s/../log/tmq_taosx_tmp_snapshot.result"
,
dir
);
TdFilePtr
pFile
=
taosOpenFile
(
f1
,
TD_FILE_TEXT
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
f1
);
exit
(
-
1
);
}
g_fp
=
pFile
;
TdFilePtr
pFile2
=
taosOpenFile
(
f2
,
TD_FILE_TEXT
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile2
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
f2
);
exit
(
-
1
);
}
char
*
result
[]
=
{
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:6},{
\"
name
\"
:
\"
c3
\"
,
\"
type
\"
:8,
\"
length
\"
:64},{
\"
name
\"
:
\"
c4
\"
,
\"
type
\"
:5}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1},{
\"
name
\"
:
\"
t2
\"
,
\"
type
\"
:8,
\"
length
\"
:64}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct0
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:1000},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
value
\"
:
\"\\\"
ttt
\\\"\"
},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1,
\"
value
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:2000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:5000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
cc3
\"
,
\"
type
\"
:5}],
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
i
\"
,
\"
type
\"
:4}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15,
\"
value
\"
:
\"
{
\\\"
k1
\\\"
:1,
\\\"
k2
\\\"
:
\\\"
hello
\\\"
}
\"
}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[]}"
,
};
for
(
int
i
=
0
;
i
<
sizeof
(
result
)
/
sizeof
(
result
[
0
]);
i
++
){
taosFprintfFile
(
pFile2
,
result
[
i
]);
taosFprintfFile
(
pFile2
,
"
\n
"
);
}
taosCloseFile
(
&
pFile2
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
==
3
&&
strcmp
(
argv
[
1
],
"-c"
)
==
0
)
{
strcpy
(
dir
,
argv
[
2
]);
}
else
{
// strcpy(dir, "../../../sim/psim/cfg");
strcpy
(
dir
,
"/var/log"
);
}
printf
(
"env init
\n
"
);
initLogFile
();
if
(
init_env
()
<
0
)
{
return
-
1
;
}
create_topic
();
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
basic_consume_loop
(
tmq
,
topic_list
);
/*sync_consume_loop(tmq, topic_list);*/
taosCloseFile
(
&
g_fp
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录