Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b4169cdf
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b4169cdf
编写于
4月 21, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact1
上级
d1530753
9f8a447d
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
550 addition
and
321 deletion
+550
-321
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+21
-30
source/client/src/clientMain.c
source/client/src/clientMain.c
+10
-0
source/os/src/osFile.c
source/os/src/osFile.c
+1
-1
source/util/src/tlog.c
source/util/src/tlog.c
+1
-1
tests/script/tsim/tmq/basic1.sim
tests/script/tsim/tmq/basic1.sim
+206
-157
tests/script/tsim/tmq/clearConsume.sim
tests/script/tsim/tmq/clearConsume.sim
+28
-0
tests/script/tsim/tmq/consume.sh
tests/script/tsim/tmq/consume.sh
+17
-5
tests/script/tsim/tmq/prepareBasicEnv.sim
tests/script/tsim/tmq/prepareBasicEnv.sim
+88
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+177
-127
未找到文件。
source/client/inc/clientInt.h
浏览文件 @
b4169cdf
...
@@ -219,6 +219,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo);
...
@@ -219,6 +219,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void
setResPrecision
(
SReqResultInfo
*
pResInfo
,
int32_t
precision
);
void
setResPrecision
(
SReqResultInfo
*
pResInfo
,
int32_t
precision
);
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
,
bool
convertUcs4
);
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
,
bool
convertUcs4
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
void
doFreeReqResultInfo
(
SReqResultInfo
*
pResInfo
);
static
FORCE_INLINE
SReqResultInfo
*
tmqGetCurResInfo
(
TAOS_RES
*
res
)
{
static
FORCE_INLINE
SReqResultInfo
*
tmqGetCurResInfo
(
TAOS_RES
*
res
)
{
SMqRspObj
*
msg
=
(
SMqRspObj
*
)
res
;
SMqRspObj
*
msg
=
(
SMqRspObj
*
)
res
;
...
...
source/client/src/clientEnv.c
浏览文件 @
b4169cdf
...
@@ -30,15 +30,15 @@
...
@@ -30,15 +30,15 @@
#define TSC_VAR_RELEASED 0
#define TSC_VAR_RELEASED 0
SAppInfo
appInfo
;
SAppInfo
appInfo
;
int32_t
clientReqRefPool
=
-
1
;
int32_t
clientReqRefPool
=
-
1
;
int32_t
clientConnRefPool
=
-
1
;
int32_t
clientConnRefPool
=
-
1
;
static
TdThreadOnce
tscinit
=
PTHREAD_ONCE_INIT
;
static
TdThreadOnce
tscinit
=
PTHREAD_ONCE_INIT
;
volatile
int32_t
tscInitRes
=
0
;
volatile
int32_t
tscInitRes
=
0
;
static
void
registerRequest
(
SRequestObj
*
pRequest
)
{
static
void
registerRequest
(
SRequestObj
*
pRequest
)
{
STscObj
*
pTscObj
=
acquireTscObj
(
pRequest
->
pTscObj
->
id
);
STscObj
*
pTscObj
=
acquireTscObj
(
pRequest
->
pTscObj
->
id
);
assert
(
pTscObj
!=
NULL
);
assert
(
pTscObj
!=
NULL
);
// connection has been released already, abort creating request.
// connection has been released already, abort creating request.
...
@@ -49,8 +49,8 @@ static void registerRequest(SRequestObj *pRequest) {
...
@@ -49,8 +49,8 @@ static void registerRequest(SRequestObj *pRequest) {
if
(
pTscObj
->
pAppInfo
)
{
if
(
pTscObj
->
pAppInfo
)
{
SInstanceSummary
*
pSummary
=
&
pTscObj
->
pAppInfo
->
summary
;
SInstanceSummary
*
pSummary
=
&
pTscObj
->
pAppInfo
->
summary
;
int32_t
total
=
atomic_add_fetch_64
((
int64_t
*
)
&
pSummary
->
totalRequests
,
1
);
int32_t
total
=
atomic_add_fetch_64
((
int64_t
*
)
&
pSummary
->
totalRequests
,
1
);
int32_t
currentInst
=
atomic_add_fetch_64
((
int64_t
*
)
&
pSummary
->
currentRequests
,
1
);
int32_t
currentInst
=
atomic_add_fetch_64
((
int64_t
*
)
&
pSummary
->
currentRequests
,
1
);
tscDebug
(
"0x%"
PRIx64
" new Request from connObj:0x%"
PRIx64
tscDebug
(
"0x%"
PRIx64
" new Request from connObj:0x%"
PRIx64
", current:%d, app current:%d, total:%d, reqId:0x%"
PRIx64
,
", current:%d, app current:%d, total:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
pTscObj
->
id
,
num
,
currentInst
,
total
,
pRequest
->
requestId
);
pRequest
->
self
,
pRequest
->
pTscObj
->
id
,
num
,
currentInst
,
total
,
pRequest
->
requestId
);
...
@@ -60,16 +60,16 @@ static void registerRequest(SRequestObj *pRequest) {
...
@@ -60,16 +60,16 @@ static void registerRequest(SRequestObj *pRequest) {
static
void
deregisterRequest
(
SRequestObj
*
pRequest
)
{
static
void
deregisterRequest
(
SRequestObj
*
pRequest
)
{
assert
(
pRequest
!=
NULL
);
assert
(
pRequest
!=
NULL
);
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SInstanceSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
SInstanceSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
int32_t
currentInst
=
atomic_sub_fetch_64
((
int64_t
*
)
&
pActivity
->
currentRequests
,
1
);
int32_t
currentInst
=
atomic_sub_fetch_64
((
int64_t
*
)
&
pActivity
->
currentRequests
,
1
);
int32_t
num
=
atomic_sub_fetch_32
(
&
pTscObj
->
numOfReqs
,
1
);
int32_t
num
=
atomic_sub_fetch_32
(
&
pTscObj
->
numOfReqs
,
1
);
int64_t
duration
=
taosGetTimestampUs
()
-
pRequest
->
metric
.
start
;
int64_t
duration
=
taosGetTimestampUs
()
-
pRequest
->
metric
.
start
;
tscDebug
(
"0x%"
PRIx64
" free Request from connObj: 0x%"
PRIx64
", reqId:0x%"
PRIx64
" elapsed:%"
PRIu64
tscDebug
(
"0x%"
PRIx64
" free Request from connObj: 0x%"
PRIx64
", reqId:0x%"
PRIx64
" elapsed:%"
PRIu64
" ms, current:%d, app current:%d"
,
" ms, current:%d, app current:%d"
,
pRequest
->
self
,
pTscObj
->
id
,
pRequest
->
requestId
,
duration
/
1000
,
num
,
currentInst
);
pRequest
->
self
,
pTscObj
->
id
,
pRequest
->
requestId
,
duration
/
1000
,
num
,
currentInst
);
releaseTscObj
(
pTscObj
->
id
);
releaseTscObj
(
pTscObj
->
id
);
}
}
...
@@ -109,12 +109,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
...
@@ -109,12 +109,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
}
}
void
closeAllRequests
(
SHashObj
*
pRequests
)
{
void
closeAllRequests
(
SHashObj
*
pRequests
)
{
void
*
pIter
=
taosHashIterate
(
pRequests
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pRequests
,
NULL
);
while
(
pIter
!=
NULL
)
{
while
(
pIter
!=
NULL
)
{
int64_t
*
rid
=
pIter
;
int64_t
*
rid
=
pIter
;
releaseRequest
(
*
rid
);
releaseRequest
(
*
rid
);
pIter
=
taosHashIterate
(
pRequests
,
pIter
);
pIter
=
taosHashIterate
(
pRequests
,
pIter
);
}
}
}
}
...
@@ -144,7 +144,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
...
@@ -144,7 +144,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
pObj
->
pAppInfo
=
pAppInfo
;
pObj
->
pAppInfo
=
pAppInfo
;
tstrncpy
(
pObj
->
user
,
user
,
sizeof
(
pObj
->
user
));
tstrncpy
(
pObj
->
user
,
user
,
sizeof
(
pObj
->
user
));
memcpy
(
pObj
->
pass
,
auth
,
TSDB_PASSWORD_LEN
);
memcpy
(
pObj
->
pass
,
auth
,
TSDB_PASSWORD_LEN
);
...
@@ -160,13 +160,9 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
...
@@ -160,13 +160,9 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
return
pObj
;
return
pObj
;
}
}
STscObj
*
acquireTscObj
(
int64_t
rid
)
{
STscObj
*
acquireTscObj
(
int64_t
rid
)
{
return
(
STscObj
*
)
taosAcquireRef
(
clientConnRefPool
,
rid
);
}
return
(
STscObj
*
)
taosAcquireRef
(
clientConnRefPool
,
rid
);
}
int32_t
releaseTscObj
(
int64_t
rid
)
{
int32_t
releaseTscObj
(
int64_t
rid
)
{
return
taosReleaseRef
(
clientConnRefPool
,
rid
);
}
return
taosReleaseRef
(
clientConnRefPool
,
rid
);
}
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
)
{
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
)
{
assert
(
pObj
!=
NULL
);
assert
(
pObj
!=
NULL
);
...
@@ -190,11 +186,11 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
...
@@ -190,11 +186,11 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
tsem_init
(
&
pRequest
->
body
.
rspSem
,
0
,
0
);
tsem_init
(
&
pRequest
->
body
.
rspSem
,
0
,
0
);
registerRequest
(
pRequest
);
registerRequest
(
pRequest
);
return
pRequest
;
return
pRequest
;
}
}
static
void
doFreeReqResultInfo
(
SReqResultInfo
*
pResInfo
)
{
void
doFreeReqResultInfo
(
SReqResultInfo
*
pResInfo
)
{
taosMemoryFreeClear
(
pResInfo
->
pRspMsg
);
taosMemoryFreeClear
(
pResInfo
->
pRspMsg
);
taosMemoryFreeClear
(
pResInfo
->
length
);
taosMemoryFreeClear
(
pResInfo
->
length
);
taosMemoryFreeClear
(
pResInfo
->
row
);
taosMemoryFreeClear
(
pResInfo
->
row
);
...
@@ -217,7 +213,7 @@ static void doDestroyRequest(void *p) {
...
@@ -217,7 +213,7 @@ static void doDestroyRequest(void *p) {
assert
(
RID_VALID
(
pRequest
->
self
));
assert
(
RID_VALID
(
pRequest
->
self
));
taosHashRemove
(
pRequest
->
pTscObj
->
pRequests
,
&
pRequest
->
self
,
sizeof
(
pRequest
->
self
));
taosHashRemove
(
pRequest
->
pTscObj
->
pRequests
,
&
pRequest
->
self
,
sizeof
(
pRequest
->
self
));
taosMemoryFreeClear
(
pRequest
->
msgBuf
);
taosMemoryFreeClear
(
pRequest
->
msgBuf
);
taosMemoryFreeClear
(
pRequest
->
sqlstr
);
taosMemoryFreeClear
(
pRequest
->
sqlstr
);
taosMemoryFreeClear
(
pRequest
->
pDb
);
taosMemoryFreeClear
(
pRequest
->
pDb
);
...
@@ -244,14 +240,9 @@ void destroyRequest(SRequestObj *pRequest) {
...
@@ -244,14 +240,9 @@ void destroyRequest(SRequestObj *pRequest) {
taosRemoveRef
(
clientReqRefPool
,
pRequest
->
self
);
taosRemoveRef
(
clientReqRefPool
,
pRequest
->
self
);
}
}
SRequestObj
*
acquireRequest
(
int64_t
rid
)
{
SRequestObj
*
acquireRequest
(
int64_t
rid
)
{
return
(
SRequestObj
*
)
taosAcquireRef
(
clientReqRefPool
,
rid
);
}
return
(
SRequestObj
*
)
taosAcquireRef
(
clientReqRefPool
,
rid
);
}
int32_t
releaseRequest
(
int64_t
rid
)
{
return
taosReleaseRef
(
clientReqRefPool
,
rid
);
}
int32_t
releaseRequest
(
int64_t
rid
)
{
return
taosReleaseRef
(
clientReqRefPool
,
rid
);
}
void
taos_init_imp
(
void
)
{
void
taos_init_imp
(
void
)
{
// In the APIs of other program language, taos_cleanup is not available yet.
// In the APIs of other program language, taos_cleanup is not available yet.
...
@@ -380,7 +371,7 @@ uint64_t generateRequestId() {
...
@@ -380,7 +371,7 @@ uint64_t generateRequestId() {
}
}
uint64_t
id
=
0
;
uint64_t
id
=
0
;
while
(
true
)
{
while
(
true
)
{
int64_t
ts
=
taosGetTimestampMs
();
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
uint64_t
pid
=
taosGetPId
();
...
...
source/client/src/clientMain.c
浏览文件 @
b4169cdf
...
@@ -135,6 +135,16 @@ void taos_free_result(TAOS_RES *res) {
...
@@ -135,6 +135,16 @@ void taos_free_result(TAOS_RES *res) {
if
(
TD_RES_QUERY
(
res
))
{
if
(
TD_RES_QUERY
(
res
))
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
destroyRequest
(
pRequest
);
destroyRequest
(
pRequest
);
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
pRsp
=
(
SMqRspObj
*
)
res
;
if
(
pRsp
->
rsp
.
blockData
)
taosArrayDestroyP
(
pRsp
->
rsp
.
blockData
,
taosMemoryFree
);
if
(
pRsp
->
rsp
.
blockDataLen
)
taosArrayDestroy
(
pRsp
->
rsp
.
blockDataLen
);
if
(
pRsp
->
rsp
.
blockSchema
)
taosArrayDestroy
(
pRsp
->
rsp
.
blockSchema
);
if
(
pRsp
->
rsp
.
blockTbName
)
taosArrayDestroy
(
pRsp
->
rsp
.
blockTbName
);
if
(
pRsp
->
rsp
.
blockTags
)
taosArrayDestroy
(
pRsp
->
rsp
.
blockTags
);
if
(
pRsp
->
rsp
.
blockTagSchema
)
taosArrayDestroy
(
pRsp
->
rsp
.
blockTagSchema
);
pRsp
->
resInfo
.
pRspMsg
=
NULL
;
doFreeReqResultInfo
(
&
pRsp
->
resInfo
);
}
}
}
}
...
...
source/os/src/osFile.c
浏览文件 @
b4169cdf
...
@@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
...
@@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
#if FILE_WITH_LOCK
#if FILE_WITH_LOCK
taosThreadRwlockWrlock
(
&
(
pFile
->
rwlock
));
taosThreadRwlockWrlock
(
&
(
pFile
->
rwlock
));
#endif
#endif
/*assert(pFile->fd >= 0); // Please check if you have closed the file.*/
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
int64_t
nleft
=
count
;
int64_t
nleft
=
count
;
int64_t
nwritten
=
0
;
int64_t
nwritten
=
0
;
...
...
source/util/src/tlog.c
浏览文件 @
b4169cdf
...
@@ -223,7 +223,7 @@ static void *taosThreadToOpenNewFile(void *param) {
...
@@ -223,7 +223,7 @@ static void *taosThreadToOpenNewFile(void *param) {
tsLogObj
.
logHandle
->
pFile
=
pFile
;
tsLogObj
.
logHandle
->
pFile
=
pFile
;
tsLogObj
.
lines
=
0
;
tsLogObj
.
lines
=
0
;
tsLogObj
.
openInProgress
=
0
;
tsLogObj
.
openInProgress
=
0
;
taosSsleep
(
3
);
taosSsleep
(
10
);
taosCloseLogByFd
(
pOldFile
);
taosCloseLogByFd
(
pOldFile
);
uInfo
(
" new log file:%d is opened"
,
tsLogObj
.
flag
);
uInfo
(
" new log file:%d is opened"
,
tsLogObj
.
flag
);
...
...
tests/script/tsim/tmq/basic1.sim
浏览文件 @
b4169cdf
...
@@ -8,196 +8,245 @@
...
@@ -8,196 +8,245 @@
#
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
#
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
run tsim/tmq/prepareBasicEnv.sim
system sh/exec.sh -n dnode1 -s start
#---- global parameters start ----#
$
loop_cnt = 0
$
dbName = db
check_dnode_ready:
$vgroups = 1
$loop_cnt = $loop_cnt + 1
$stbPrefix = stb
sleep 200
$ctbPrefix = ctb
if $loop_cnt == 10 then
$ntbPrefix = ntb
print ====> dnode not ready!
$stbNum = 1
return -1
$ctbNum = 10
endi
$ntbNum = 10
sql show dnodes
$rowsPerCtb = 100
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
if $data00 != 1 then
#---- global parameters end ----#
return -1
endi
$pullDelay = 3
if $data04 != ready then
$ifcheckdata = 1
goto check_dnode_ready
$showMsg = 1
endi
$showRow = 0
sql connect
sql connect
sql use $dbName
$dbNamme = d0
print == create topics from super table
print =============== create database , vgroup 4
sql create topic topic_stb_column as select ts, c3 from stb
sql create database $dbNamme vgroups 4
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql use $dbNamme
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct0 using stb tags(1000)
sql create table ct1 using stb tags(2000)
#sql create table ct3 using stb tags(3000)
print =============== create normal table
print == create topics from child table
sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print =============== create multi topics. notes: now only support:
print == create topics from normal table
print =============== 1. columns from stb; 2. * from ctb; 3. columns from ctb
sql create topic topic_ntb_column as select ts, c3 from ntb0
print =============== will support: * from stb; function from stb/ctb
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
sql create topic topic_stb_column as select ts, c1, c3 from stb
#sql show topics
#sql create topic topic_stb_all as select * from stb
#if $rows != 9 then
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
# return -1
#endi
sql create topic topic_ctb_column as select ts, c1, c3 from ct0
$keyList = ' . group.id:cgrp1
sql create topic topic_ctb_all as select * from ct0
$keyList = $keyList . '
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0
sql create topic topic_ntb_column as select ts, c1, c3 from ntb
print ================ test consume from stb
sql create topic topic_ntb_all as select * from ntb
$loop_cnt = 0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb
loop_consume_diff_topic_from_stb:
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
sql show tables
$consumerId = 0
if $rows != 3 then
$totalMsgOfStb = $ctbNum * $rowsPerCtb
#$expectmsgcnt = $totalMsgOfStb + 1
$expectmsgcnt = 110
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
return -1
endi
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
print =============== insert data
sql show tables
if $rows != 2 then
$tbPrefix = ct
$tbNum = 2
$rowNum = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = ' . binary
$binary = $binary . $c
$binary = $binary . '
sql insert into $tb values ($tstart , $c , $x , $binary )
sql insert into ntb values ($tstart , $c , $x , $binary )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
#root@trd02 /home $ tmq_sim --help
# -c Configuration directory, default is
# -d The name of the database for cosumer, no default
# -t The topic string for cosumer, no default
# -k The key-value string for cosumer, no default
# -g showMsgFlag, default is 0
#
$totalMsgCnt = $rowNum * $tbNum
print inserted totalMsgCnt: $totalMsgCnt
# supported key:
# group.id:<xxx>
# enable.auto.commit:<true | false>
# auto.offset.reset:<earliest | latest | none>
# td.connect.ip:<fqdn | ipaddress>
# td.connect.user:root
# td.connect.pass:taosdata
# td.connect.port:6030
# td.connect.db:db
system_content echo -n \$BUILD_DIR
$tmq_sim = $system_content . /build/bin/tmq_sim
system_content echo -n \$SIM_DIR
$tsim_cfg = $system_content . /tsim/cfg
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20, 0}@ then
return -1
return -1
endi
endi
#######################################################################################
#print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#print cmd result----> $system_content
#if $system_content != @{consume success: 20, 0}@ then
# return -1
#endi
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2"
print ================ test consume from ctb
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2"
$loop_cnt = 0
print cmd result----> $system_content
loop_consume_diff_topic_from_ctb:
if $system_content != @{consume success: 20, 0}@ then
if $loop_cnt == 0 then
print expect @{consume success: 20, 0}@, actual: $system_content
print == scenario 1: topic_ctb_column
return -1
$topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi
endi
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
$consumerId = 0
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
$totalMsgOfCtb = $rowsPerCtb
print cmd result----> $system_content
$expectmsgcnt = $totalMsgOfCtb + 1
if $system_content != @{consume success: 10, 0}@ then
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
return -1
endi
endi
if $data[0][2] != $totalMsgOfCtb then
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 10, 0}@ then
return -1
return -1
endi
endi
if $data[0][3] != $totalMsgOfCtb then
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2"
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 10, 0}@ then
return -1
return -1
endi
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
sql show tables
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
if $rows != 2 then
print cmd result----> $system_content
if $system_content != @{consume success: 20, 0}@ then
return -1
return -1
endi
endi
#######################################################################################
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
print ================ test consume from ntb
print cmd result----> $system_content
$loop_cnt = 0
if $system_content != @{consume success: 20, 0}@ then
loop_consume_diff_topic_from_ntb:
return -1
if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi
endi
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2"
$consumerId = 0
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2"
$totalMsgOfNtb = $rowsPerCtb
print cmd result----> $system_content
$expectmsgcnt = $totalMsgOfNtb + 1
if $system_content != @{consume success: 20, 0}@ then
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
return -1
endi
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
print =============== create database , vgroup 4
#------ not need stop consumer, because it exit after pull msg overthan expect msg
$dbNamme = d1
#system tsim/tmq/consume.sh -s stop -x SIGINT
sql create database $dbNamme vgroups 4
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/tsim/tmq/clearConsume.sim
0 → 100644
浏览文件 @
b4169cdf
sql connect
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
sql use $dbName
print == create consume info table and consume result table
sql drop table consumeinfo
sql drop table consumeresult
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
tests/script/tsim/tmq/consume.sh
浏览文件 @
b4169cdf
...
@@ -11,16 +11,25 @@ set +e
...
@@ -11,16 +11,25 @@ set +e
# set default value for parameters
# set default value for parameters
EXEC_OPTON
=
start
EXEC_OPTON
=
start
DB_NAME
=
db
DB_NAME
=
db
CDB_NAME
=
db
POLL_DELAY
=
5
POLL_DELAY
=
5
VALGRIND
=
0
VALGRIND
=
0
SIGNAL
=
SIGINT
SIGNAL
=
SIGINT
SHOW_MSG
=
0
SHOW_ROW
=
0
while
getopts
"d:s:v:y:x:"
arg
while
getopts
"d:s:v:y:x:
g:r:w:
"
arg
do
do
case
$arg
in
case
$arg
in
d
)
d
)
DB_NAME
=
$OPTARG
DB_NAME
=
$OPTARG
;;
;;
g
)
SHOW_MSG
=
$OPTARG
;;
r
)
SHOW_ROW
=
$OPTARG
;;
s
)
s
)
EXEC_OPTON
=
$OPTARG
EXEC_OPTON
=
$OPTARG
;;
;;
...
@@ -33,6 +42,9 @@ do
...
@@ -33,6 +42,9 @@ do
x
)
x
)
SIGNAL
=
$OPTARG
SIGNAL
=
$OPTARG
;;
;;
w
)
CDB_NAME
=
$OPTARG
;;
?
)
?
)
echo
"unkown argument"
echo
"unkown argument"
;;
;;
...
@@ -80,11 +92,11 @@ echo "DB_NAME: $DB_NAME
...
@@ -80,11 +92,11 @@ echo "DB_NAME: $DB_NAME
echo
"------------------------------------------------------------------------"
echo
"------------------------------------------------------------------------"
if
[
"
$EXEC_OPTON
"
=
"start"
]
;
then
if
[
"
$EXEC_OPTON
"
=
"start"
]
;
then
if
[
$VALGRIND
-eq
1
]
;
then
if
[
$VALGRIND
-eq
1
]
;
then
echo nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-
d
$DB_NAME
-y
$POLL_DELAY
>
/dev/null 2>&1 &
echo nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-
y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
>
/dev/null 2>&1 &
nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-
d
$DB_NAME
-y
$POLL_DELAY
>
/dev/null 2>&1 &
nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-
y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
>
/dev/null 2>&1 &
else
else
echo
"nohup
$PROGRAM
-c
$CFG_DIR
-
d
$DB_NAME
-y
$POLL_DELAY
> /dev/null 2>&1 &"
echo
"nohup
$PROGRAM
-c
$CFG_DIR
-
y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
-w
$CDB_NAME
> /dev/null 2>&1 &"
nohup
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
>
/dev/null 2>&1 &
nohup
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
-g
$SHOW_MSG
-r
$SHOW_ROW
-w
$CDB_NAME
>
/dev/null 2>&1 &
fi
fi
else
else
PID
=
`
ps
-ef
|grep tmq_sim |
grep
-v
grep
|
awk
'{print $2}'
`
PID
=
`
ps
-ef
|grep tmq_sim |
grep
-v
grep
|
awk
'{print $2}'
`
...
...
tests/script/tsim/tmq/prepareBasicEnv.sim
0 → 100644
浏览文件 @
b4169cdf
# stop all dnodes before start this case
system sh/stop_dnodes.sh
# deploy dnode 1
system sh/deploy.sh -n dnode1 -i 1
# add some config items for this case
#system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
# start dnode 1
system sh/exec.sh -n dnode1 -s start
sql connect
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
print == create database $dbName vgroups $vgroups
sql create database $dbName vgroups $vgroups
#wait database ready
$loop_cnt = 0
check_db_ready:
if $loop_cnt == 10 then
print ====> database not ready!
return -1
endi
sql show databases
print ==> rows: $rows
print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12]
print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20]
if $data(db)[20] != nostrict then
sleep 100
$loop_cnt = $loop_cnt + 1
goto check_db_ready
endi
sql use $dbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
print == create super table
sql create table $stbPrefix (ts timestamp, c1 int, c2 float, c3 binary(16)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print == create child table, normal table and insert data
$i = 0
while $i < $ctbNum
$ctb = $ctbPrefix . $i
$ntb = $ntbPrefix . $i
sql create table $ctb using $stbPrefix tags( $i )
sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(16))
$x = 0
while $x < $rowsPerCtb
$binary = ' . binary-
$binary = $binary . $i
$binary = $binary . '
sql insert into $ctb values ($tstart , $i , $x , $binary )
sql insert into $ntb values ($tstart , $i , $x , $binary )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
tests/test/c/tmqSim.c
浏览文件 @
b4169cdf
...
@@ -31,46 +31,49 @@
...
@@ -31,46 +31,49 @@
#define NC "\033[0m"
#define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))
#define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_SQL_STR_LEN
(1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_ROW_STR_LEN
(16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_CONSUMER_THREAD_CNT
(16)
typedef
struct
{
typedef
struct
{
TdThread
thread
;
TdThread
thread
;
int32_t
consumerId
;
int32_t
consumerId
;
int32_t
ifCheckData
;
int32_t
ifCheckData
;
int64_t
expectMsgCnt
;
int64_t
expectMsgCnt
;
int64_t
consumeMsgCnt
;
int64_t
consumeRowCnt
;
int32_t
checkresult
;
int64_t
consumeMsgCnt
;
char
topicString
[
1024
]
;
int32_t
checkresult
;
char
keyString
[
1024
]
;
char
topicString
[
1024
]
;
int32_t
numOfTopic
;
char
keyString
[
102
4
];
char
topics
[
32
][
6
4
];
int32_t
numOfTopic
;
int32_t
numOfKey
;
char
topics
[
32
][
64
];
char
key
[
32
][
64
];
char
value
[
32
][
64
];
int32_t
numOfKey
;
char
key
[
32
][
64
];
char
value
[
32
][
64
];
tmq_t
*
tmq
;
tmq_t
*
tmq
;
tmq_list_t
*
topicList
;
tmq_list_t
*
topicList
;
}
SThreadInfo
;
}
SThreadInfo
;
typedef
struct
{
typedef
struct
{
// input from argvs
// input from argvs
char
dbName
[
32
];
char
cdbName
[
32
];
int32_t
showMsgFlag
;
char
dbName
[
32
];
int32_t
consumeDelay
;
// unit s
int32_t
showMsgFlag
;
int32_t
numOfThread
;
int32_t
showRowFlag
;
SThreadInfo
stThreads
[
MAX_CONSUMER_THREAD_CNT
];
int32_t
consumeDelay
;
// unit s
int32_t
numOfThread
;
SThreadInfo
stThreads
[
MAX_CONSUMER_THREAD_CNT
];
}
SConfInfo
;
}
SConfInfo
;
static
SConfInfo
g_stConfInfo
;
static
SConfInfo
g_stConfInfo
;
TdFilePtr
g_fp
=
NULL
;
TdFilePtr
g_fp
=
NULL
;
// char* g_pRowValue = NULL;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
// TdFilePtr g_fp = NULL;
...
@@ -85,51 +88,62 @@ static void printHelp() {
...
@@ -85,51 +88,62 @@ static void printHelp() {
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The name of the database for cosumer, no default "
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The name of the database for cosumer, no default "
);
printf
(
"%s%s
\n
"
,
indent
,
"-g"
);
printf
(
"%s%s
\n
"
,
indent
,
"-g"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"showMsgFlag, default is "
,
g_stConfInfo
.
showMsgFlag
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"showMsgFlag, default is "
,
g_stConfInfo
.
showMsgFlag
);
printf
(
"%s%s
\n
"
,
indent
,
"-r"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"showRowFlag, default is "
,
g_stConfInfo
.
showRowFlag
);
printf
(
"%s%s
\n
"
,
indent
,
"-y"
);
printf
(
"%s%s
\n
"
,
indent
,
"-y"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"consume delay, default is s"
,
g_stConfInfo
.
consumeDelay
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"consume delay, default is s"
,
g_stConfInfo
.
consumeDelay
);
exit
(
EXIT_SUCCESS
);
exit
(
EXIT_SUCCESS
);
}
}
void
initLogFile
()
{
void
initLogFile
()
{
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
TdFilePtr
pFile
=
taosOpenFile
(
"./tmqlog.txt"
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
|
TD_FILE_STREAM
);
char
file
[
256
];
sprintf
(
file
,
"%s/../log/tmqlog.txt"
,
configDir
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
"./tmqlog.txt"
);
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
"./tmqlog.txt"
);
exit
-
1
;
exit
-
1
;
};
};
g_fp
=
pFile
;
g_fp
=
pFile
;
}
void
saveConfigToLogFile
()
{
time_t
tTime
=
taosGetTimestampSec
();
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
g_fp
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
g_fp
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
pFile
,
"# dbName: %s
\n
"
,
g_stConfInfo
.
dbName
);
taosFprintfFile
(
g_fp
,
"# dbName: %s
\n
"
,
g_stConfInfo
.
dbName
);
taosFprintfFile
(
pFile
,
"# showMsgFlag: %d
\n
"
,
g_stConfInfo
.
showMsgFlag
);
taosFprintfFile
(
g_fp
,
"# cdbName: %s
\n
"
,
g_stConfInfo
.
cdbName
);
taosFprintfFile
(
pFile
,
"# consumeDelay: %d
\n
"
,
g_stConfInfo
.
consumeDelay
);
taosFprintfFile
(
g_fp
,
"# showMsgFlag: %d
\n
"
,
g_stConfInfo
.
showMsgFlag
);
taosFprintfFile
(
g_fp
,
"# showRowFlag: %d
\n
"
,
g_stConfInfo
.
showRowFlag
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
taosFprintfFile
(
g_fp
,
"# consumeDelay: %d
\n
"
,
g_stConfInfo
.
consumeDelay
);
taosFprintfFile
(
pFile
,
"# consumer %d info:
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
consumerId
);
taosFprintfFile
(
pFile
,
" Topics: "
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
;
i
++
)
{
taosFprintfFile
(
g_fp
,
"# consumer %d info:
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
consumerId
);
taosFprintfFile
(
pFile
,
"%s, "
,
g_stConfInfo
.
stThreads
[
i
].
topics
[
i
]);
taosFprintfFile
(
g_fp
,
" Topics: "
);
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
;
i
++
)
{
taosFprintfFile
(
g_fp
,
"%s, "
,
g_stConfInfo
.
stThreads
[
i
].
topics
[
i
]);
}
}
taosFprintfFile
(
pFile
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosFprintfFile
(
pFile
,
" Key: "
);
taosFprintfFile
(
g_fp
,
" Key: "
);
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
stThreads
[
i
].
numOfKey
;
i
++
)
{
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
stThreads
[
i
].
numOfKey
;
i
++
)
{
taosFprintfFile
(
pFile
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
i
],
g_stConfInfo
.
stThreads
[
i
].
value
[
i
]);
taosFprintfFile
(
g_fp
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
i
],
g_stConfInfo
.
stThreads
[
i
].
value
[
i
]);
}
}
taosFprintfFile
(
pFile
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
}
}
taosFprintfFile
(
pFile
,
"# Test time: %d-%02d-%02d %02d:%02d:%02d
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
taosFprintfFile
(
g_fp
,
"# Test time: %d-%02d-%02d %02d:%02d:%02d
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
g_fp
,
"###################################################################
\n
"
);
}
}
void
parseArgument
(
int32_t
argc
,
char
*
argv
[])
{
void
parseArgument
(
int32_t
argc
,
char
*
argv
[])
{
memset
(
&
g_stConfInfo
,
0
,
sizeof
(
SConfInfo
));
memset
(
&
g_stConfInfo
,
0
,
sizeof
(
SConfInfo
));
g_stConfInfo
.
showMsgFlag
=
0
;
g_stConfInfo
.
showMsgFlag
=
0
;
g_stConfInfo
.
showRowFlag
=
0
;
g_stConfInfo
.
consumeDelay
=
5
;
g_stConfInfo
.
consumeDelay
=
5
;
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
...
@@ -138,10 +152,14 @@ void parseArgument(int32_t argc, char* argv[]) {
...
@@ -138,10 +152,14 @@ void parseArgument(int32_t argc, char* argv[]) {
exit
(
0
);
exit
(
0
);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
]);
strcpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
cdbName
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]);
strcpy
(
configDir
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
)
{
g_stConfInfo
.
showRowFlag
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-y"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-y"
)
==
0
)
{
g_stConfInfo
.
consumeDelay
=
atol
(
argv
[
++
i
]);
g_stConfInfo
.
consumeDelay
=
atol
(
argv
[
++
i
]);
}
else
{
}
else
{
...
@@ -150,11 +168,17 @@ void parseArgument(int32_t argc, char* argv[]) {
...
@@ -150,11 +168,17 @@ void parseArgument(int32_t argc, char* argv[]) {
}
}
}
}
initLogFile
();
taosFprintfFile
(
g_fp
,
"====parseArgument() success
\n
"
);
#if 1
#if 1
pPrint
(
"%s configDir:%s %s"
,
GREEN
,
configDir
,
NC
);
pPrint
(
"%s configDir:%s %s"
,
GREEN
,
configDir
,
NC
);
pPrint
(
"%s dbName:%s %s"
,
GREEN
,
g_stConfInfo
.
dbName
,
NC
);
pPrint
(
"%s dbName:%s %s"
,
GREEN
,
g_stConfInfo
.
dbName
,
NC
);
pPrint
(
"%s cdbName:%s %s"
,
GREEN
,
g_stConfInfo
.
cdbName
,
NC
);
pPrint
(
"%s consumeDelay:%d %s"
,
GREEN
,
g_stConfInfo
.
consumeDelay
,
NC
);
pPrint
(
"%s consumeDelay:%d %s"
,
GREEN
,
g_stConfInfo
.
consumeDelay
,
NC
);
pPrint
(
"%s showMsgFlag:%d %s"
,
GREEN
,
g_stConfInfo
.
showMsgFlag
,
NC
);
pPrint
(
"%s showMsgFlag:%d %s"
,
GREEN
,
g_stConfInfo
.
showMsgFlag
,
NC
);
pPrint
(
"%s showRowFlag:%d %s"
,
GREEN
,
g_stConfInfo
.
showRowFlag
,
NC
);
#endif
#endif
}
}
...
@@ -180,24 +204,29 @@ void ltrim(char* str) {
...
@@ -180,24 +204,29 @@ void ltrim(char* str) {
// return str;
// return str;
}
}
static
int
running
=
1
;
static
int
running
=
1
;
static
void
msg_process
(
TAOS_RES
*
msg
,
int32
_t
msgIndex
,
int32_t
threadLable
)
{
static
int32_t
msg_process
(
TAOS_RES
*
msg
,
int64
_t
msgIndex
,
int32_t
threadLable
)
{
char
buf
[
1024
];
char
buf
[
1024
];
int32_t
totalRows
=
0
;
//
printf("topic: %s\n", tmq_get_topic_name(msg));
//printf("topic: %s\n", tmq_get_topic_name(msg));
//
printf("vg:%d\n", tmq_get_vgroup_id(msg));
//printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile
(
g_fp
,
"msg index:%
d, threadLable: %d
\n
"
,
msgIndex
,
threadLable
);
taosFprintfFile
(
g_fp
,
"msg index:%
"
PRId64
", threadLable: %d
\n
"
,
msgIndex
,
threadLable
);
taosFprintfFile
(
g_fp
,
"topic: %s, vgroupId: %d
\n
"
,
tmq_get_topic_name
(
msg
),
tmq_get_vgroup_id
(
msg
));
taosFprintfFile
(
g_fp
,
"topic: %s, vgroupId: %d
\n
"
,
tmq_get_topic_name
(
msg
),
tmq_get_vgroup_id
(
msg
));
while
(
1
)
{
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
if
(
0
!=
g_stConfInfo
.
showRowFlag
)
{
int32_t
numOfFields
=
taos_field_count
(
msg
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
// taos_print_row(buf, row, fields, numOfFields);
int32_t
numOfFields
=
taos_field_count
(
msg
);
// printf("%s\n", buf);
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
// taosFprintfFile(g_fp, "%s\n", buf);
taosFprintfFile
(
g_fp
,
"rows[%d]: %s
\n
"
,
totalRows
,
buf
);
}
totalRows
++
;
}
}
return
totalRows
;
}
}
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
...
@@ -213,31 +242,43 @@ int queryDB(TAOS* taos, char* command) {
...
@@ -213,31 +242,43 @@ int queryDB(TAOS* taos, char* command) {
return
0
;
return
0
;
}
}
void
build_consumer
(
SThreadInfo
*
pInfo
)
{
static
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
tmq_resp_err_t
resp
,
tmq_topic_vgroup_list_t
*
offsets
)
{
char
sqlStr
[
1024
]
=
{
0
};
printf
(
"tmq_commit_cb_print() commit %d
\n
"
,
resp
);
}
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
void
build_consumer
(
SThreadInfo
*
pInfo
)
{
assert
(
pConn
!=
NULL
);
tmq_conf_t
*
conf
=
tmq_conf_new
(
);
sprintf
(
sqlStr
,
"use %s"
,
g_stConfInfo
.
dbName
);
//tmq_conf_set(conf, "td.connect.ip", "localhost");
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
//tmq_conf_set(conf, "td.connect.port", "6030");
if
(
taos_errno
(
pRes
)
!=
0
)
{
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
taos_free_result
(
pRes
);
exit
(
-
1
);
}
taos_free_result
(
pRes
);
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
// tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set_offset_commit_cb
(
conf
,
tmq_commit_cb_print
);
// tmq_conf_set(conf, "group.id", "cgrp1");
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfKey
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfKey
;
i
++
)
{
tmq_conf_set
(
conf
,
pInfo
->
key
[
i
],
pInfo
->
value
[
i
]);
tmq_conf_set
(
conf
,
pInfo
->
key
[
i
],
pInfo
->
value
[
i
]);
}
}
//tmq_conf_set(conf, "client.id", "c-001");
//tmq_conf_set(conf, "enable.auto.commit", "true");
//tmq_conf_set(conf, "enable.auto.commit", "false");
//tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
//tmq_conf_set(conf, "auto.offset.reset", "none");
//tmq_conf_set(conf, "auto.offset.reset", "earliest");
//tmq_conf_set(conf, "auto.offset.reset", "latest");
pInfo
->
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
pInfo
->
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
return
;
return
;
}
}
void
build_topic_list
(
SThreadInfo
*
pInfo
)
{
void
build_topic_list
(
SThreadInfo
*
pInfo
)
{
pInfo
->
topicList
=
tmq_list_new
();
pInfo
->
topicList
=
tmq_list_new
();
// tmq_list_append(topic_list, "test_stb_topic_1");
// tmq_list_append(topic_list, "test_stb_topic_1");
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfTopic
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfTopic
;
i
++
)
{
...
@@ -246,45 +287,49 @@ void build_topic_list(SThreadInfo* pInfo) {
...
@@ -246,45 +287,49 @@ void build_topic_list(SThreadInfo* pInfo) {
return
;
return
;
}
}
int32_t
saveConsumeResult
(
SThreadInfo
*
pInfo
)
{
int32_t
saveConsumeResult
(
SThreadInfo
*
pInfo
)
{
char
sqlStr
[
1024
]
=
{
0
};
char
sqlStr
[
1024
]
=
{
0
};
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
assert
(
pConn
!=
NULL
);
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf
(
sqlStr
,
"insert into %s.consumeresult values (now, %d, %"
PRId64
", %d)"
,
g_stConfInfo
.
dbName
,
sprintf
(
sqlStr
,
"insert into %s.consumeresult values (now, %d, %"
PRId64
", %"
PRId64
", %d)"
,
pInfo
->
consumerId
,
pInfo
->
consumeMsgCnt
,
pInfo
->
checkresult
);
g_stConfInfo
.
cdbName
,
pInfo
->
consumerId
,
pInfo
->
consumeMsgCnt
,
pInfo
->
consumeRowCnt
,
pInfo
->
checkresult
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in save consumeinfo, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"error in save consumeinfo, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
exit
(
-
1
);
exit
(
-
1
);
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
return
0
;
return
0
;
}
}
void
loop_consume
(
SThreadInfo
*
pInfo
)
{
void
loop_consume
(
SThreadInfo
*
pInfo
)
{
tmq_resp_err_t
err
;
tmq_resp_err_t
err
;
int64_t
totalMsgs
=
0
;
int64_t
totalMsgs
=
0
;
//
int64_t totalRows = 0;
int64_t
totalRows
=
0
;
while
(
running
)
{
while
(
running
)
{
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
pInfo
->
tmq
,
g_stConfInfo
.
consumeDelay
*
1000
);
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
pInfo
->
tmq
,
g_stConfInfo
.
consumeDelay
*
1000
);
if
(
tmqMsg
)
{
if
(
tmqMsg
)
{
if
(
0
!=
g_stConfInfo
.
showMsgFlag
)
{
if
(
0
!=
g_stConfInfo
.
showMsgFlag
)
{
msg_process
(
tmqMsg
,
totalMsgs
,
0
);
totalRows
+=
msg_process
(
tmqMsg
,
totalMsgs
,
pInfo
->
consumerId
);
}
}
taos_free_result
(
tmqMsg
);
taos_free_result
(
tmqMsg
);
totalMsgs
++
;
totalMsgs
++
;
if
(
totalMsgs
>=
pInfo
->
expectMsgCnt
)
{
if
(
totalMsgs
>=
pInfo
->
expectMsgCnt
)
{
break
;
break
;
}
}
...
@@ -292,7 +337,7 @@ void loop_consume(SThreadInfo* pInfo) {
...
@@ -292,7 +337,7 @@ void loop_consume(SThreadInfo* pInfo) {
break
;
break
;
}
}
}
}
err
=
tmq_consumer_close
(
pInfo
->
tmq
);
err
=
tmq_consumer_close
(
pInfo
->
tmq
);
if
(
err
)
{
if
(
err
)
{
printf
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
...
@@ -300,34 +345,38 @@ void loop_consume(SThreadInfo* pInfo) {
...
@@ -300,34 +345,38 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
pInfo
->
consumeMsgCnt
=
totalMsgs
;
pInfo
->
consumeMsgCnt
=
totalMsgs
;
pInfo
->
consumeRowCnt
=
totalRows
;
taosFprintfFile
(
g_fp
,
"==== consumerId: %d, consumeMsgCnt: %"
PRId64
", consumeRowCnt: %"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
pInfo
->
consumeMsgCnt
,
pInfo
->
consumeRowCnt
);
}
}
void
*
consumeThreadFunc
(
void
*
param
)
{
void
*
consumeThreadFunc
(
void
*
param
)
{
int32_t
totalMsgs
=
0
;
int32_t
totalMsgs
=
0
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
build_consumer
(
pInfo
);
build_consumer
(
pInfo
);
build_topic_list
(
pInfo
);
build_topic_list
(
pInfo
);
if
((
NULL
==
pInfo
->
tmq
)
||
(
NULL
==
pInfo
->
topicList
))
{
if
((
NULL
==
pInfo
->
tmq
)
||
(
NULL
==
pInfo
->
topicList
)){
return
NULL
;
return
NULL
;
}
}
tmq_resp_err_t
err
=
tmq_subscribe
(
pInfo
->
tmq
,
pInfo
->
topicList
);
tmq_resp_err_t
err
=
tmq_subscribe
(
pInfo
->
tmq
,
pInfo
->
topicList
);
if
(
err
)
{
if
(
err
)
{
printf
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
exit
(
-
1
);
}
}
loop_consume
(
pInfo
);
loop_consume
(
pInfo
);
err
=
tmq_unsubscribe
(
pInfo
->
tmq
);
err
=
tmq_unsubscribe
(
pInfo
->
tmq
);
if
(
err
)
{
if
(
err
)
{
printf
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
pInfo
->
consumeMsgCnt
=
-
1
;
pInfo
->
consumeMsgCnt
=
-
1
;
return
NULL
;
return
NULL
;
}
}
// save consume result into consumeresult table
// save consume result into consumeresult table
saveConsumeResult
(
pInfo
);
saveConsumeResult
(
pInfo
);
...
@@ -339,7 +388,7 @@ void parseConsumeInfo() {
...
@@ -339,7 +388,7 @@ void parseConsumeInfo() {
const
char
delim
[
2
]
=
","
;
const
char
delim
[
2
]
=
","
;
const
char
ch
=
':'
;
const
char
ch
=
':'
;
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
token
=
strtok
(
g_stConfInfo
.
stThreads
[
i
].
topicString
,
delim
);
token
=
strtok
(
g_stConfInfo
.
stThreads
[
i
].
topicString
,
delim
);
while
(
token
!=
NULL
)
{
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
// printf("%s\n", token );
...
@@ -347,10 +396,10 @@ void parseConsumeInfo() {
...
@@ -347,10 +396,10 @@ void parseConsumeInfo() {
ltrim
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
]);
ltrim
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
++
;
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
++
;
token
=
strtok
(
NULL
,
delim
);
token
=
strtok
(
NULL
,
delim
);
}
}
token
=
strtok
(
g_stConfInfo
.
stThreads
[
i
].
keyString
,
delim
);
token
=
strtok
(
g_stConfInfo
.
stThreads
[
i
].
keyString
,
delim
);
while
(
token
!=
NULL
)
{
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
// printf("%s\n", token );
...
@@ -364,7 +413,7 @@ void parseConsumeInfo() {
...
@@ -364,7 +413,7 @@ void parseConsumeInfo() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
stThreads
[
i
].
numOfKey
++
;
g_stConfInfo
.
stThreads
[
i
].
numOfKey
++
;
}
}
token
=
strtok
(
NULL
,
delim
);
token
=
strtok
(
NULL
,
delim
);
}
}
}
}
...
@@ -372,47 +421,48 @@ void parseConsumeInfo() {
...
@@ -372,47 +421,48 @@ void parseConsumeInfo() {
int32_t
getConsumeInfo
()
{
int32_t
getConsumeInfo
()
{
char
sqlStr
[
1024
]
=
{
0
};
char
sqlStr
[
1024
]
=
{
0
};
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
assert
(
pConn
!=
NULL
);
sprintf
(
sqlStr
,
"select * from %s.consumeinfo"
,
g_stConfInfo
.
dbName
);
sprintf
(
sqlStr
,
"select * from %s.consumeinfo"
,
g_stConfInfo
.
c
dbName
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in get consumeinfo, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"error in get consumeinfo, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taosFprintfFile
(
g_fp
,
"error in get consumeinfo, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taosCloseFile
(
&
g_fp
);
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
exit
(
-
1
);
exit
(
-
1
);
}
}
TAOS_ROW
row
=
NULL
;
TAOS_ROW
row
=
NULL
;
int
num_fields
=
taos_num_fields
(
pRes
);
int
num_fields
=
taos_num_fields
(
pRes
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
pRes
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
pRes
);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
// ifcheckdata int
int32_t
numOfThread
=
0
;
int32_t
numOfThread
=
0
;
while
((
row
=
taos_fetch_row
(
pRes
)))
{
while
((
row
=
taos_fetch_row
(
pRes
)))
{
int32_t
*
lengths
=
taos_fetch_lengths
(
pRes
);
int32_t
*
lengths
=
taos_fetch_lengths
(
pRes
);
for
(
int
i
=
0
;
i
<
num_fields
;
++
i
)
{
for
(
int
i
=
0
;
i
<
num_fields
;
++
i
)
{
if
(
row
[
i
]
==
NULL
||
0
==
i
)
{
if
(
row
[
i
]
==
NULL
||
0
==
i
)
{
continue
;
continue
;
}
}
if
((
1
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_INT
))
{
if
((
1
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_INT
))
{
g_stConfInfo
.
stThreads
[
numOfThread
].
consumerId
=
*
((
int32_t
*
)
row
[
i
]);
g_stConfInfo
.
stThreads
[
numOfThread
].
consumerId
=
*
((
int32_t
*
)
row
[
i
]);
}
else
if
((
2
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
))
{
}
else
if
((
2
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
))
{
memcpy
(
g_stConfInfo
.
stThreads
[
numOfThread
].
topicString
,
row
[
i
],
lengths
[
i
]);
memcpy
(
g_stConfInfo
.
stThreads
[
numOfThread
].
topicString
,
row
[
i
],
lengths
[
i
]);
}
else
if
((
3
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
))
{
}
else
if
((
3
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
))
{
memcpy
(
g_stConfInfo
.
stThreads
[
numOfThread
].
keyString
,
row
[
i
],
lengths
[
i
]);
memcpy
(
g_stConfInfo
.
stThreads
[
numOfThread
].
keyString
,
row
[
i
],
lengths
[
i
]);
}
else
if
((
4
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BIGINT
))
{
}
else
if
((
4
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BIGINT
))
{
g_stConfInfo
.
stThreads
[
numOfThread
].
expectMsgCnt
=
*
((
int64_t
*
)
row
[
i
]);
g_stConfInfo
.
stThreads
[
numOfThread
].
expectMsgCnt
=
*
((
int64_t
*
)
row
[
i
]);
}
else
if
((
5
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_INT
))
{
}
else
if
((
5
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_INT
))
{
g_stConfInfo
.
stThreads
[
numOfThread
].
ifCheckData
=
*
((
int32_t
*
)
row
[
i
]);
g_stConfInfo
.
stThreads
[
numOfThread
].
ifCheckData
=
*
((
int32_t
*
)
row
[
i
]);
}
}
}
}
numOfThread
++
;
numOfThread
++
;
}
}
g_stConfInfo
.
numOfThread
=
numOfThread
;
g_stConfInfo
.
numOfThread
=
numOfThread
;
...
@@ -423,10 +473,11 @@ int32_t getConsumeInfo() {
...
@@ -423,10 +473,11 @@ int32_t getConsumeInfo() {
return
0
;
return
0
;
}
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
int
main
(
int32_t
argc
,
char
*
argv
[])
{
parseArgument
(
argc
,
argv
);
parseArgument
(
argc
,
argv
);
getConsumeInfo
();
getConsumeInfo
();
init
LogFile
();
saveConfigTo
LogFile
();
TdThreadAttr
thattr
;
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
taosThreadAttrInit
(
&
thattr
);
...
@@ -434,19 +485,18 @@ int main(int32_t argc, char* argv[]) {
...
@@ -434,19 +485,18 @@ int main(int32_t argc, char* argv[]) {
// pthread_create one thread to consume
// pthread_create one thread to consume
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
++
i
)
{
taosThreadCreate
(
&
(
g_stConfInfo
.
stThreads
[
i
].
thread
),
&
thattr
,
consumeThreadFunc
,
taosThreadCreate
(
&
(
g_stConfInfo
.
stThreads
[
i
].
thread
),
&
thattr
,
consumeThreadFunc
,
(
void
*
)(
&
(
g_stConfInfo
.
stThreads
[
i
])));
(
void
*
)(
&
(
g_stConfInfo
.
stThreads
[
i
])));
}
}
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
taosThreadJoin
(
g_stConfInfo
.
stThreads
[
i
].
thread
,
NULL
);
taosThreadJoin
(
g_stConfInfo
.
stThreads
[
i
].
thread
,
NULL
);
}
}
//
printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
//
printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosCloseFile
(
&
g_fp
);
taosCloseFile
(
&
g_fp
);
return
0
;
return
0
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录