Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8323cad2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8323cad2
编写于
3月 24, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/shm
上级
8b8aeabd
8204984d
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
128 addition
and
25 deletion
+128
-25
example/src/tstream.c
example/src/tstream.c
+1
-1
include/common/tmsg.h
include/common/tmsg.h
+7
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-1
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+21
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+35
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+0
-8
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+29
-2
tests/script/tsim/insert/null.sim
tests/script/tsim/insert/null.sim
+0
-2
tests/script/tsim/tmq/basic.sim
tests/script/tsim/tmq/basic.sim
+7
-3
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+27
-8
未找到文件。
example/src/tstream.c
浏览文件 @
8323cad2
...
@@ -78,7 +78,7 @@ int32_t create_stream() {
...
@@ -78,7 +78,7 @@ int32_t create_stream() {
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
const
char
*
sql
=
"select min(k), max(k), sum(k) from st1"
;
const
char
*
sql
=
"select min(k), max(k), sum(k)
as sum_of_k
from st1"
;
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes
=
tmq_create_stream
(
pConn
,
"stream1"
,
"out1"
,
sql
);
pRes
=
tmq_create_stream
(
pConn
,
"stream1"
,
"out1"
,
sql
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
include/common/tmsg.h
浏览文件 @
8323cad2
...
@@ -2414,6 +2414,13 @@ typedef struct {
...
@@ -2414,6 +2414,13 @@ typedef struct {
int32_t
reserved
;
int32_t
reserved
;
}
SStreamTaskExecRsp
;
}
SStreamTaskExecRsp
;
typedef
struct
{
SMsgHead
head
;
int64_t
streamId
;
int64_t
version
;
SArray
*
res
;
// SArray<SSDataBlock>
}
SStreamSmaSinkReq
;
#pragma pack(pop)
#pragma pack(pop)
#ifdef __cplusplus
#ifdef __cplusplus
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
8323cad2
...
@@ -728,12 +728,12 @@ typedef struct {
...
@@ -728,12 +728,12 @@ typedef struct {
char
*
logicalPlan
;
char
*
logicalPlan
;
char
*
physicalPlan
;
char
*
physicalPlan
;
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
SArray
*
outputName
;
}
SStreamObj
;
}
SStreamObj
;
int32_t
tEncodeSStreamObj
(
SCoder
*
pEncoder
,
const
SStreamObj
*
pObj
);
int32_t
tEncodeSStreamObj
(
SCoder
*
pEncoder
,
const
SStreamObj
*
pObj
);
int32_t
tDecodeSStreamObj
(
SCoder
*
pDecoder
,
SStreamObj
*
pObj
);
int32_t
tDecodeSStreamObj
(
SCoder
*
pDecoder
,
SStreamObj
*
pObj
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
8323cad2
...
@@ -16,6 +16,7 @@
...
@@ -16,6 +16,7 @@
#include "mndDef.h"
#include "mndDef.h"
int32_t
tEncodeSStreamObj
(
SCoder
*
pEncoder
,
const
SStreamObj
*
pObj
)
{
int32_t
tEncodeSStreamObj
(
SCoder
*
pEncoder
,
const
SStreamObj
*
pObj
)
{
int32_t
outputNameSz
=
0
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
db
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
db
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
createTime
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
createTime
)
<
0
)
return
-
1
;
...
@@ -43,6 +44,15 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
...
@@ -43,6 +44,15 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
}
else
{
}
else
{
tEncodeI32
(
pEncoder
,
0
);
tEncodeI32
(
pEncoder
,
0
);
}
}
if
(
pObj
->
outputName
!=
NULL
)
{
outputNameSz
=
taosArrayGetSize
(
pObj
->
outputName
);
}
if
(
tEncodeI32
(
pEncoder
,
outputNameSz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
outputNameSz
;
i
++
)
{
char
*
name
=
taosArrayGetP
(
pObj
->
outputName
,
i
);
if
(
tEncodeCStr
(
pEncoder
,
name
)
<
0
)
return
-
1
;
}
return
pEncoder
->
pos
;
return
pEncoder
->
pos
;
}
}
...
@@ -76,5 +86,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
...
@@ -76,5 +86,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
}
else
{
}
else
{
pObj
->
tasks
=
NULL
;
pObj
->
tasks
=
NULL
;
}
}
int32_t
outputNameSz
;
if
(
tDecodeI32
(
pDecoder
,
&
outputNameSz
)
<
0
)
return
-
1
;
pObj
->
outputName
=
taosArrayInit
(
outputNameSz
,
sizeof
(
void
*
));
if
(
pObj
->
outputName
==
NULL
)
{
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
outputNameSz
;
i
++
)
{
char
*
name
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
name
)
<
0
)
return
-
1
;
taosArrayPush
(
pObj
->
outputName
,
&
name
);
}
return
0
;
return
0
;
}
}
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
8323cad2
...
@@ -218,6 +218,28 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
...
@@ -218,6 +218,28 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return
0
;
return
0
;
}
}
static
SArray
*
mndExtractNamesFromAst
(
const
SNode
*
pAst
)
{
if
(
pAst
->
type
!=
QUERY_NODE_SELECT_STMT
)
return
NULL
;
SArray
*
names
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
if
(
names
==
NULL
)
{
return
NULL
;
}
SSelectStmt
*
pSelect
=
(
SSelectStmt
*
)
pAst
;
SNodeList
*
pNodes
=
pSelect
->
pProjectionList
;
SListCell
*
pCell
=
pNodes
->
pHead
;
while
(
pCell
!=
NULL
)
{
if
(
pCell
->
pNode
->
type
!=
QUERY_NODE_FUNCTION
)
{
continue
;
}
SFunctionNode
*
pFunction
=
(
SFunctionNode
*
)
pCell
->
pNode
;
char
*
name
=
strdup
(
pFunction
->
node
.
aliasName
);
taosArrayPush
(
names
,
&
name
);
pCell
=
pCell
->
pNext
;
}
return
names
;
}
static
int32_t
mndStreamGetPlanString
(
const
char
*
ast
,
char
**
pStr
)
{
static
int32_t
mndStreamGetPlanString
(
const
char
*
ast
,
char
**
pStr
)
{
if
(
NULL
==
ast
)
{
if
(
NULL
==
ast
)
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -246,6 +268,19 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
...
@@ -246,6 +268,19 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
}
}
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
)
{
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
)
{
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
ast
,
&
pAst
)
<
0
)
{
return
-
1
;
}
SArray
*
names
=
mndExtractNamesFromAst
(
pAst
);
printf
(
"|"
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
names
);
i
++
)
{
printf
(
" %15s |"
,
(
char
*
)
taosArrayGetP
(
names
,
i
));
}
printf
(
"
\n
=======================================================
\n
"
);
pStream
->
outputName
=
names
;
if
(
TSDB_CODE_SUCCESS
!=
mndStreamGetPlanString
(
ast
,
&
pStream
->
physicalPlan
))
{
if
(
TSDB_CODE_SUCCESS
!=
mndStreamGetPlanString
(
ast
,
&
pStream
->
physicalPlan
))
{
mError
(
"topic:%s, failed to get plan since %s"
,
pStream
->
name
,
terrstr
());
mError
(
"topic:%s, failed to get plan since %s"
,
pStream
->
name
,
terrstr
());
return
-
1
;
return
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
8323cad2
...
@@ -1403,14 +1403,6 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
...
@@ -1403,14 +1403,6 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
continue
;
continue
;
}
}
int32_t
bytes
=
pColInfo
->
info
.
bytes
;
if
(
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
pData
=
(
char
*
)
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
}
else
{
pData
=
(
char
*
)
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
num
)
*
pColInfo
->
info
.
bytes
;
}
if
(
!
isAllRowsNull
(
src
)
&&
pColInfo
->
info
.
colId
==
src
->
colId
)
{
if
(
!
isAllRowsNull
(
src
)
&&
pColInfo
->
info
.
colId
==
src
->
colId
)
{
if
(
!
IS_VAR_DATA_TYPE
(
pColInfo
->
info
.
type
))
{
// todo opt performance
if
(
!
IS_VAR_DATA_TYPE
(
pColInfo
->
info
.
type
))
{
// todo opt performance
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8323cad2
...
@@ -1715,10 +1715,31 @@ static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVa
...
@@ -1715,10 +1715,31 @@ static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVa
return
0
;
return
0
;
}
}
// assign the group keys or user input constant values if required
static
void
doAssignGroupKeys
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
totalRows
,
int32_t
rowIndex
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
if
(
pCtx
[
i
].
functionId
==
-
1
)
{
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
&
pCtx
[
i
]);
SColumnInfoData
*
pColInfoData
=
pCtx
[
i
].
input
.
pData
[
0
];
if
(
!
colDataIsNull
(
pColInfoData
,
totalRows
,
rowIndex
,
NULL
))
{
char
*
dest
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
char
*
data
=
colDataGetData
(
pColInfoData
,
rowIndex
);
// set result exists, todo refactor
memcpy
(
dest
,
data
,
pColInfoData
->
info
.
bytes
);
pEntryInfo
->
hasResult
=
DATA_SET_FLAG
;
pEntryInfo
->
numOfRes
=
1
;
}
}
}
}
static
void
doHashGroupbyAgg
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
static
void
doHashGroupbyAgg
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SqlFunctionCtx
*
pCtx
=
pInfo
->
binfo
.
pCtx
;
int32_t
numOfGroupCols
=
taosArrayGetSize
(
pInfo
->
pGroupCols
);
int32_t
numOfGroupCols
=
taosArrayGetSize
(
pInfo
->
pGroupCols
);
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
...
@@ -1751,7 +1772,11 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
...
@@ -1751,7 +1772,11 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
}
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
w
,
j
-
num
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfOutput
,
TSDB_ORDER_ASC
);
int32_t
rowIndex
=
j
-
num
;
doApplyFunctions
(
pCtx
,
&
w
,
rowIndex
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfOutput
,
TSDB_ORDER_ASC
);
// assign the group keys or user input constant values if required
doAssignGroupKeys
(
pCtx
,
pOperator
->
numOfOutput
,
pBlock
->
info
.
rows
,
rowIndex
);
keepGroupKeys
(
pInfo
,
pBlock
,
j
,
numOfGroupCols
);
keepGroupKeys
(
pInfo
,
pBlock
,
j
,
numOfGroupCols
);
num
=
1
;
num
=
1
;
}
}
...
@@ -1764,7 +1789,9 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
...
@@ -1764,7 +1789,9 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
}
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
w
,
pBlock
->
info
.
rows
-
num
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfOutput
,
TSDB_ORDER_ASC
);
int32_t
rowIndex
=
pBlock
->
info
.
rows
-
num
;
doApplyFunctions
(
pCtx
,
&
w
,
rowIndex
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfOutput
,
TSDB_ORDER_ASC
);
doAssignGroupKeys
(
pCtx
,
pOperator
->
numOfOutput
,
pBlock
->
info
.
rows
,
rowIndex
);
}
}
}
}
...
...
tests/script/tsim/insert/null.sim
浏览文件 @
8323cad2
...
@@ -244,8 +244,6 @@ endi
...
@@ -244,8 +244,6 @@ endi
# return -1
# return -1
#endi
#endi
return
#===================================================================
#===================================================================
#===================================================================
#===================================================================
...
...
tests/script/tsim/tmq/basic.sim
浏览文件 @
8323cad2
...
@@ -47,9 +47,12 @@ sql drop database useless_db
...
@@ -47,9 +47,12 @@ sql drop database useless_db
# -m startTimestamp, default is 1640966400000 [2022-01-01 00:00:00]
# -m startTimestamp, default is 1640966400000 [2022-01-01 00:00:00]
# -g showMsgFlag, default is 0
# -g showMsgFlag, default is 0
#
#
#system_content ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg
print cmd===> system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal
system ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg
system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal
print result-> $system_content
print cmd result----> $system_content
if $system_content != @{consume success: 100}@ then
print not match in pos000
endi
sql show databases
sql show databases
print ===> $rows $data00 $data01 $data02 $data03
print ===> $rows $data00 $data01 $data02 $data03
...
@@ -78,4 +81,5 @@ endi
...
@@ -78,4 +81,5 @@ endi
if $data00 != 10000 then
if $data00 != 10000 then
return -1
return -1
endi
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/test/c/tmqDemo.c
浏览文件 @
8323cad2
...
@@ -58,6 +58,7 @@ typedef struct {
...
@@ -58,6 +58,7 @@ typedef struct {
int32_t
totalRowsOfPerTbl
;
int32_t
totalRowsOfPerTbl
;
int64_t
startTimestamp
;
int64_t
startTimestamp
;
int32_t
showMsgFlag
;
int32_t
showMsgFlag
;
int32_t
simCase
;
int32_t
totalRowsOfT2
;
int32_t
totalRowsOfT2
;
}
SConfInfo
;
}
SConfInfo
;
...
@@ -66,7 +67,7 @@ static SConfInfo g_stConfInfo = {
...
@@ -66,7 +67,7 @@ static SConfInfo g_stConfInfo = {
"tmqdb"
,
"tmqdb"
,
"stb"
,
"stb"
,
"./tmqResult.txt"
,
// output_file
"./tmqResult.txt"
,
// output_file
"/data2/dnode/data/vnode
s
/vnode2/wal"
,
"/data2/dnode/data/vnode/vnode2/wal"
,
1
,
// threads
1
,
// threads
1
,
// tables
1
,
// tables
1
,
// vgroups
1
,
// vgroups
...
@@ -77,6 +78,7 @@ static SConfInfo g_stConfInfo = {
...
@@ -77,6 +78,7 @@ static SConfInfo g_stConfInfo = {
10000
,
// total rows for per table
10000
,
// total rows for per table
0
,
// 2020-01-01 00:00:00.000
0
,
// 2020-01-01 00:00:00.000
0
,
// show consume msg switch
0
,
// show consume msg switch
0
,
// if run in sim case
10000
,
10000
,
};
};
...
@@ -117,6 +119,8 @@ static void printHelp() {
...
@@ -117,6 +119,8 @@ static void printHelp() {
printf
(
"%s%s%s%"
PRId64
"
\n
"
,
indent
,
indent
,
"startTimestamp, default is "
,
g_stConfInfo
.
startTimestamp
);
printf
(
"%s%s%s%"
PRId64
"
\n
"
,
indent
,
indent
,
"startTimestamp, default is "
,
g_stConfInfo
.
startTimestamp
);
printf
(
"%s%s
\n
"
,
indent
,
"-g"
);
printf
(
"%s%s
\n
"
,
indent
,
"-g"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"showMsgFlag, default is "
,
g_stConfInfo
.
showMsgFlag
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"showMsgFlag, default is "
,
g_stConfInfo
.
showMsgFlag
);
printf
(
"%s%s
\n
"
,
indent
,
"-sim"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"simCase, default is "
,
g_stConfInfo
.
simCase
);
exit
(
EXIT_SUCCESS
);
exit
(
EXIT_SUCCESS
);
}
}
...
@@ -160,14 +164,17 @@ void parseArgument(int32_t argc, char *argv[]) {
...
@@ -160,14 +164,17 @@ void parseArgument(int32_t argc, char *argv[]) {
g_stConfInfo
.
startTimestamp
=
atol
(
argv
[
++
i
]);
g_stConfInfo
.
startTimestamp
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-sim"
)
==
0
)
{
g_stConfInfo
.
simCase
=
atol
(
argv
[
++
i
]);
}
else
{
}
else
{
p
Print
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
p
rintf
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
exit
(
-
1
);
}
}
}
}
g_stConfInfo
.
totalRowsOfT2
=
g_stConfInfo
.
totalRowsOfPerTbl
*
g_stConfInfo
.
ratio
;
g_stConfInfo
.
totalRowsOfT2
=
g_stConfInfo
.
totalRowsOfPerTbl
*
g_stConfInfo
.
ratio
;
#if 0
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC);
pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC);
...
@@ -184,6 +191,7 @@ void parseArgument(int32_t argc, char *argv[]) {
...
@@ -184,6 +191,7 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
#endif
}
}
static
int
running
=
1
;
static
int
running
=
1
;
...
@@ -429,15 +437,21 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
...
@@ -429,15 +437,21 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
double
consumeTime
=
(
double
)(
endTime
-
startTime
)
/
1000000
;
double
consumeTime
=
(
double
)(
endTime
-
startTime
)
/
1000000
;
if
(
batchCnt
!=
totalMsgs
)
{
if
(
batchCnt
!=
totalMsgs
)
{
pPrint
(
"%s inserted msgs: %d and consume msgs: %d mismatch %s"
,
GREEN
,
totalMsgs
,
batchCnt
,
NC
);
printf
(
"%s inserted msgs: %d and consume msgs: %d mismatch %s"
,
GREEN
,
totalMsgs
,
batchCnt
,
NC
);
exit
(
-
1
);
}
}
pPrint
(
"consume result: msgs: %d, skip log cnt: %d, time used:%.3f second
\n
"
,
batchCnt
,
skipLogNum
,
consumeTime
);
if
(
0
==
g_stConfInfo
.
simCase
)
{
printf
(
"consume result: msgs: %d, skip log cnt: %d, time used:%.3f second
\n
"
,
batchCnt
,
skipLogNum
,
consumeTime
);
}
else
{
printf
(
"{consume success: %d}"
,
totalMsgs
);
}
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.2f| %10.2f |
\n
"
,
batchCnt
,
consumeTime
,
(
double
)
batchCnt
/
consumeTime
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
)
/
consumeTime
,
(
double
)
walLogSize
/
1024
.
0
/
batchCnt
);
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.2f| %10.2f |
\n
"
,
batchCnt
,
consumeTime
,
(
double
)
batchCnt
/
consumeTime
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
)
/
consumeTime
,
(
double
)
walLogSize
/
1024
.
0
/
batchCnt
);
err
=
tmq_consumer_close
(
tmq
);
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
{
if
(
err
)
{
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
}
}
}
...
@@ -679,12 +693,17 @@ int main(int32_t argc, char *argv[]) {
...
@@ -679,12 +693,17 @@ int main(int32_t argc, char *argv[]) {
walLogSize
=
getDirectorySize
(
g_stConfInfo
.
vnodeWalPath
);
walLogSize
=
getDirectorySize
(
g_stConfInfo
.
vnodeWalPath
);
if
(
walLogSize
<=
0
)
{
if
(
walLogSize
<=
0
)
{
pError
(
"vnode2/wal size incorrect!"
);
printf
(
"vnode2/wal size incorrect!"
);
exit
(
-
1
);
}
else
{
}
else
{
pPrint
(
".log file size in vnode2/wal: %.3f MBytes
\n
"
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
if
(
0
==
g_stConfInfo
.
simCase
)
{
pPrint
(
".log file size in vnode2/wal: %.3f MBytes
\n
"
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
}
}
}
pPrint
(
"insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second
\n
"
,
totalRows
,
totalMsgs
,
seconds
,
rowsSpeed
,
msgsSpeed
);
if
(
0
==
g_stConfInfo
.
simCase
)
{
pPrint
(
"insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second
\n
"
,
totalRows
,
totalMsgs
,
seconds
,
rowsSpeed
,
msgsSpeed
);
}
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.3f "
,
totalMsgs
,
seconds
,
msgsSpeed
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.3f "
,
totalMsgs
,
seconds
,
msgsSpeed
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录