Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
17ecc24e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
17ecc24e
编写于
4月 14, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact
上级
224e6ccb
1e265d76
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
471 addition
and
385 deletion
+471
-385
example/src/tmq.c
example/src/tmq.c
+9
-3
include/client/taos.h
include/client/taos.h
+2
-2
include/common/tmsg.h
include/common/tmsg.h
+2
-0
include/common/ttime.h
include/common/ttime.h
+19
-0
include/os/osTime.h
include/os/osTime.h
+4
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+7
-2
source/client/src/clientHb.c
source/client/src/clientHb.c
+8
-9
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+9
-8
source/client/src/clientMain.c
source/client/src/clientMain.c
+5
-3
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+10
-8
source/client/src/tmq.c
source/client/src/tmq.c
+10
-2
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-0
source/common/src/ttime.c
source/common/src/ttime.c
+5
-5
source/dnode/mnode/impl/src/mndGrant.c
source/dnode/mnode/impl/src/mndGrant.c
+3
-2
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+15
-13
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+6
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-2
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+4
-2
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+1
-1
source/os/src/osTime.c
source/os/src/osTime.c
+11
-0
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+230
-225
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+102
-91
tests/tsim/src/simExe.c
tests/tsim/src/simExe.c
+1
-1
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+1
-1
未找到文件。
example/src/tmq.c
浏览文件 @
17ecc24e
...
...
@@ -54,7 +54,8 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c4 int) tags(t1 int)"
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -101,7 +102,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c
4
from ct1"
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c
3
from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -144,6 +145,7 @@ void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
...
...
@@ -152,11 +154,15 @@ tmq_t* build_consumer() {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
"abc1"
);
tmq_conf_set_offset_commit_cb
(
conf
,
tmq_commit_cb_print
);
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
tmq_t
*
tmq
=
tmq_consumer_new
1
(
conf
,
NULL
,
0
);
return
tmq
;
}
...
...
include/client/taos.h
浏览文件 @
17ecc24e
...
...
@@ -247,10 +247,10 @@ DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
const
char
*
);
DLL_EXPORT
void
tmq_list_destroy
(
tmq_list_t
*
);
// will be removed in 3.0
#if 1
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
#endif
// will replace last one
DLL_EXPORT
tmq_t
*
tmq_consumer_new1
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
const
char
*
tmq_err2str
(
tmq_resp_err_t
);
...
...
include/common/tmsg.h
浏览文件 @
17ecc24e
...
...
@@ -332,6 +332,7 @@ int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void
*
taosDecodeSEpSet
(
void
*
buf
,
SEpSet
*
pEp
);
typedef
struct
{
int8_t
connType
;
int32_t
pid
;
char
app
[
TSDB_APP_NAME_LEN
];
char
db
[
TSDB_DB_NAME_LEN
];
...
...
@@ -346,6 +347,7 @@ typedef struct {
int64_t
clusterId
;
int32_t
connId
;
int8_t
superUser
;
int8_t
connType
;
SEpSet
epSet
;
char
sVersion
[
128
];
}
SConnectRsp
;
...
...
include/common/ttime.h
浏览文件 @
17ecc24e
...
...
@@ -40,6 +40,7 @@ extern "C" {
* @return timestamp decided by global conf variable, tsTimePrecision
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
* precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond.
* precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond.
*/
static
FORCE_INLINE
int64_t
taosGetTimestamp
(
int32_t
precision
)
{
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
...
...
@@ -51,6 +52,24 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
}
}
/*
* @return timestamp of today at 00:00:00 in given precision
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
* precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond.
* precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond.
*/
static
FORCE_INLINE
int64_t
taosGetTimestampToday
(
int32_t
precision
)
{
int64_t
factor
=
(
precision
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
1000000
:
1000000000
;
time_t
t
=
taosTime
(
NULL
);
struct
tm
*
tm
=
taosLocalTime
(
&
t
,
NULL
);
tm
->
tm_hour
=
0
;
tm
->
tm_min
=
0
;
tm
->
tm_sec
=
0
;
return
(
int64_t
)
taosMktime
(
tm
)
*
factor
;
}
int64_t
taosTimeAdd
(
int64_t
t
,
int64_t
duration
,
char
unit
,
int32_t
precision
);
int64_t
taosTimeTruncate
(
int64_t
t
,
const
SInterval
*
pInterval
,
int32_t
precision
);
int32_t
taosTimeCountInterval
(
int64_t
skey
,
int64_t
ekey
,
int64_t
interval
,
char
unit
,
int32_t
precision
);
...
...
include/os/osTime.h
浏览文件 @
17ecc24e
...
...
@@ -27,9 +27,11 @@ extern "C" {
#ifndef ALLOW_FORBID_FUNC
#define strptime STRPTIME_FUNC_TAOS_FORBID
#define gettimeofday GETTIMEOFDAY_FUNC_TAOS_FORBID
#define localtime LOCALTIME_FUNC_TAOS_FORBID
#define localtime_s LOCALTIMES_FUNC_TAOS_FORBID
#define localtime_r LOCALTIMER_FUNC_TAOS_FORBID
#define time TIME_FUNC_TAOS_FORBID
#define mktime MKTIME_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
...
...
@@ -82,6 +84,8 @@ static FORCE_INLINE int64_t taosGetTimestampNs() {
char
*
taosStrpTime
(
const
char
*
buf
,
const
char
*
fmt
,
struct
tm
*
tm
);
struct
tm
*
taosLocalTime
(
const
time_t
*
timep
,
struct
tm
*
result
);
time_t
taosTime
(
time_t
*
t
);
time_t
taosMktime
(
struct
tm
*
timep
);
#ifdef __cplusplus
}
...
...
source/client/inc/clientInt.h
浏览文件 @
17ecc24e
...
...
@@ -45,6 +45,11 @@ extern "C" {
#define HEARTBEAT_INTERVAL 1500 // ms
enum
{
CONN_TYPE__QUERY
=
1
,
CONN_TYPE__TMQ
,
};
typedef
struct
SAppInstInfo
SAppInstInfo
;
typedef
struct
{
...
...
@@ -132,9 +137,9 @@ typedef struct STscObj {
char
pass
[
TSDB_PASSWORD_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
char
ver
[
128
];
int8_t
connType
;
int32_t
acctId
;
uint32_t
connId
;
int32_t
connType
;
uint64_t
id
;
// ref ID returned by taosAddRef
TdThreadMutex
mutex
;
// used to protect the operation on db
int32_t
numOfReqs
;
// number of sqlObj bound to this connection
...
...
@@ -272,7 +277,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void
initMsgHandleFp
();
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
uint16_t
port
,
int
connType
);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
bool
topicQuery
,
SQuery
**
pQuery
);
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
);
...
...
source/client/src/clientHb.c
浏览文件 @
17ecc24e
...
...
@@ -23,6 +23,8 @@ static SClientHbMgr clientHbMgr = {0};
static
int32_t
hbCreateThread
();
static
void
hbStopThread
();
static
int32_t
hbMqHbReqHandle
(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
)
{
return
0
;
}
static
int32_t
hbMqHbRspHandle
(
SAppHbMgr
*
pAppHbMgr
,
SClientHbRsp
*
pRsp
)
{
return
0
;
}
static
int32_t
hbProcessDBInfoRsp
(
void
*
value
,
int32_t
valueLen
,
struct
SCatalog
*
pCatalog
)
{
...
...
@@ -297,11 +299,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return
TSDB_CODE_SUCCESS
;
}
int32_t
hbMqHbReqHandle
(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
)
{
return
0
;
}
void
hbMgrInitMqHbHandle
()
{
clientHbMgr
.
reqHandle
[
HEARTBEAT_TYPE_QUERY
]
=
hbQueryHbReqHandle
;
clientHbMgr
.
reqHandle
[
HEARTBEAT_TYPE_MQ
]
=
hbMqHbReqHandle
;
clientHbMgr
.
rspHandle
[
HEARTBEAT_TYPE_QUERY
]
=
hbQueryHbRspHandle
;
clientHbMgr
.
rspHandle
[
HEARTBEAT_TYPE_MQ
]
=
hbMqHbRspHandle
;
}
...
...
@@ -438,7 +439,7 @@ static int32_t hbCreateThread() {
if
(
taosThreadCreate
(
&
clientHbMgr
.
thread
,
&
thAttr
,
hbThreadFunc
,
NULL
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
taosThreadAttrDestroy
(
&
thAttr
);
return
0
;
}
...
...
@@ -568,7 +569,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int32_t
connId
,
int64_t
clusterId
,
int32_t
hbType
)
{
SClientHbKey
connKey
=
{
.
connId
=
connId
,
.
hbType
=
HEARTBEAT_TYPE_QUERY
,
.
hbType
=
hbType
,
};
SHbConnInfo
info
=
{
0
};
...
...
@@ -578,16 +579,14 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3
*
pClusterId
=
clusterId
;
info
.
param
=
pClusterId
;
break
;
return
hbRegisterConnImpl
(
pAppHbMgr
,
connKey
,
&
info
)
;
}
case
HEARTBEAT_TYPE_MQ
:
{
break
;
return
0
;
}
default:
break
;
return
0
;
}
return
hbRegisterConnImpl
(
pAppHbMgr
,
connKey
,
&
info
);
}
void
hbDeregisterConn
(
SAppHbMgr
*
pAppHbMgr
,
SClientHbKey
connKey
)
{
...
...
source/client/src/clientImpl.c
浏览文件 @
17ecc24e
...
...
@@ -11,7 +11,7 @@
#include "tref.h"
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
,
int8_t
connType
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
static
bool
stringLengthCheck
(
const
char
*
str
,
size_t
maxsize
)
{
...
...
@@ -40,10 +40,10 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
static
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
SAppInstInfo
*
pAppInfo
,
int
connType
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
uint16_t
port
,
int
connType
)
{
if
(
taos_init
()
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
...
...
@@ -111,7 +111,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
taosThreadMutexUnlock
(
&
appInfo
.
mutex
);
taosMemoryFreeClear
(
key
);
return
taosConnectImpl
(
user
,
&
secretEncrypt
[
0
],
localDb
,
NULL
,
NULL
,
*
pInst
);
return
taosConnectImpl
(
user
,
&
secretEncrypt
[
0
],
localDb
,
NULL
,
NULL
,
*
pInst
,
connType
);
}
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
)
{
...
...
@@ -418,7 +418,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
}
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
)
{
SAppInstInfo
*
pAppInfo
,
int
connType
)
{
STscObj
*
pTscObj
=
createTscObj
(
user
,
auth
,
db
,
pAppInfo
);
if
(
NULL
==
pTscObj
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -432,7 +432,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return
NULL
;
}
SMsgSendInfo
*
body
=
buildConnectMsg
(
pRequest
);
SMsgSendInfo
*
body
=
buildConnectMsg
(
pRequest
,
connType
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
transporterId
,
body
);
...
...
@@ -455,7 +455,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return
pTscObj
;
}
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
)
{
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
,
int8_t
connType
)
{
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
pMsgSendInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -478,6 +478,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
}
taosMemoryFreeClear
(
db
);
connectReq
.
connType
=
connType
;
connectReq
.
pid
=
htonl
(
appInfo
.
pid
);
connectReq
.
startTime
=
htobe64
(
appInfo
.
startTime
);
tstrncpy
(
connectReq
.
app
,
appInfo
.
appName
,
sizeof
(
connectReq
.
app
));
...
...
@@ -563,7 +564,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
return
NULL
;
}
return
taos_connect_internal
(
ip
,
user
,
NULL
,
auth
,
db
,
port
);
return
taos_connect_internal
(
ip
,
user
,
NULL
,
auth
,
db
,
port
,
CONN_TYPE__QUERY
);
}
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
...
...
source/client/src/clientMain.c
浏览文件 @
17ecc24e
...
...
@@ -87,7 +87,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass
=
TSDB_DEFAULT_PASS
;
}
return
taos_connect_internal
(
ip
,
user
,
pass
,
NULL
,
db
,
port
);
return
taos_connect_internal
(
ip
,
user
,
pass
,
NULL
,
db
,
port
,
CONN_TYPE__QUERY
);
}
void
taos_close
(
TAOS
*
taos
)
{
...
...
@@ -124,8 +124,10 @@ const char *taos_errstr(TAOS_RES *res) {
}
void
taos_free_result
(
TAOS_RES
*
res
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
destroyRequest
(
pRequest
);
if
(
TD_RES_QUERY
(
res
))
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
destroyRequest
(
pRequest
);
}
}
int
taos_field_count
(
TAOS_RES
*
res
)
{
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
17ecc24e
...
...
@@ -69,9 +69,9 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj
->
pAppInfo
->
clusterId
=
connectRsp
.
clusterId
;
atomic_add_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
pTscObj
->
connType
=
HEARTBEAT_TYPE_QUERY
;
pTscObj
->
connType
=
connectRsp
.
connType
;
hbRegisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connectRsp
.
connId
,
connectRsp
.
clusterId
,
HEARTBEAT_TYPE_QUERY
);
hbRegisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connectRsp
.
connId
,
connectRsp
.
clusterId
,
connectRsp
.
connType
);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug
(
"0x%"
PRIx64
" clusterId:%"
PRId64
", totalConn:%"
PRId64
,
pRequest
->
requestId
,
connectRsp
.
clusterId
,
...
...
@@ -119,13 +119,14 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
usedbRsp
.
vgVersion
>=
0
)
{
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
tstrerror
(
code
));
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
tstrerror
(
code
));
}
else
{
catalogRemoveDB
(
pCatalog
,
usedbRsp
.
db
,
usedbRsp
.
uid
);
}
}
tFreeSUsedbRsp
(
&
usedbRsp
);
tFreeSUsedbRsp
(
&
usedbRsp
);
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -139,7 +140,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tDeserializeSUseDbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
usedbRsp
);
SName
name
=
{
0
};
tNameFromString
(
&
name
,
usedbRsp
.
db
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameFromString
(
&
name
,
usedbRsp
.
db
,
T_NAME_ACCT
|
T_NAME_DB
);
SUseDbOutput
output
=
{
0
};
code
=
queryBuildUseDbOutput
(
&
output
,
&
usedbRsp
);
...
...
@@ -151,11 +152,12 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tscError
(
"failed to build use db output since %s"
,
terrstr
());
}
else
{
struct
SCatalog
*
pCatalog
=
NULL
;
struct
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
tstrerror
(
code
));
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
tstrerror
(
code
));
}
else
{
catalogUpdateDBVgInfo
(
pCatalog
,
output
.
db
,
output
.
dbId
,
output
.
dbVgroup
);
}
...
...
source/client/src/tmq.c
浏览文件 @
17ecc24e
...
...
@@ -357,7 +357,15 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if
(
pTmq
==
NULL
)
{
return
NULL
;
}
pTmq
->
pTscObj
=
taos_connect
(
conf
->
ip
,
conf
->
user
,
conf
->
pass
,
conf
->
db
,
conf
->
port
);
const
char
*
user
=
conf
->
user
==
NULL
?
TSDB_DEFAULT_USER
:
conf
->
user
;
const
char
*
pass
=
conf
->
pass
==
NULL
?
TSDB_DEFAULT_PASS
:
conf
->
pass
;
ASSERT
(
user
);
ASSERT
(
pass
);
ASSERT
(
conf
->
db
);
pTmq
->
pTscObj
=
taos_connect_internal
(
conf
->
ip
,
user
,
pass
,
NULL
,
conf
->
db
,
conf
->
port
,
CONN_TYPE__TMQ
);
if
(
pTmq
->
pTscObj
==
NULL
)
return
NULL
;
pTmq
->
inWaiting
=
0
;
pTmq
->
status
=
0
;
...
...
@@ -783,7 +791,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
}
struct
tm
*
ptm
=
localtime
(
&
tt
);
struct
tm
*
ptm
=
taosLocalTime
(
&
tt
,
NULL
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
...
...
source/common/src/tdatablock.c
浏览文件 @
17ecc24e
...
...
@@ -1368,7 +1368,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
}
struct
tm
*
ptm
=
localtime
(
&
tt
);
struct
tm
*
ptm
=
taosLocalTime
(
&
tt
,
NULL
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
...
...
source/common/src/tmsg.c
浏览文件 @
17ecc24e
...
...
@@ -2532,6 +2532,7 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
connType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
pid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
app
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
db
)
<
0
)
return
-
1
;
...
...
@@ -2548,6 +2549,7 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
connType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
pid
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
app
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
db
)
<
0
)
return
-
1
;
...
...
@@ -2567,6 +2569,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if
(
tEncodeI64
(
&
encoder
,
pRsp
->
clusterId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
connId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
superUser
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
connType
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
&
encoder
,
&
pRsp
->
epSet
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
sVersion
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
...
...
@@ -2585,6 +2588,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if
(
tDecodeI64
(
&
decoder
,
&
pRsp
->
clusterId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
connId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
superUser
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
connType
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
&
decoder
,
&
pRsp
->
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
sVersion
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
...
...
source/common/src/ttime.c
浏览文件 @
17ecc24e
...
...
@@ -70,7 +70,7 @@ void deltaToUtcInitOnce() {
struct
tm
tm
=
{
0
};
(
void
)
taosStrpTime
(
"1970-01-01 00:00:00"
,
(
const
char
*
)(
"%Y-%m-%d %H:%M:%S"
),
&
tm
);
m_deltaUtc
=
(
int64_t
)
m
ktime
(
&
tm
);
m_deltaUtc
=
(
int64_t
)
taosM
ktime
(
&
tm
);
// printf("====delta:%lld\n\n", seconds);
}
...
...
@@ -344,7 +344,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
}
/* mktime will be affected by TZ, set by using taos_options */
int64_t
seconds
=
m
ktime
(
&
tm
);
int64_t
seconds
=
taosM
ktime
(
&
tm
);
int64_t
fraction
=
0
;
...
...
@@ -539,7 +539,7 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
return
(
int64_t
)(
m
ktime
(
&
tm
)
*
TSDB_TICK_PER_SECOND
(
precision
));
return
(
int64_t
)(
taosM
ktime
(
&
tm
)
*
TSDB_TICK_PER_SECOND
(
precision
));
}
int32_t
taosTimeCountInterval
(
int64_t
skey
,
int64_t
ekey
,
int64_t
interval
,
char
unit
,
int32_t
precision
)
{
...
...
@@ -598,7 +598,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
tm
.
tm_mon
=
mon
%
12
;
}
start
=
(
int64_t
)(
m
ktime
(
&
tm
)
*
TSDB_TICK_PER_SECOND
(
precision
));
start
=
(
int64_t
)(
taosM
ktime
(
&
tm
)
*
TSDB_TICK_PER_SECOND
(
precision
));
}
else
{
int64_t
delta
=
t
-
pInterval
->
interval
;
int32_t
factor
=
(
delta
>=
0
)
?
1
:
-
1
;
...
...
@@ -745,7 +745,7 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
assert
(
false
);
}
ptm
=
localtime
(
&
quot
);
ptm
=
taosLocalTime
(
&
quot
,
NULL
);
int32_t
length
=
(
int32_t
)
strftime
(
ts
,
40
,
"%Y-%m-%dT%H:%M:%S"
,
ptm
);
length
+=
snprintf
(
ts
+
length
,
fractionLen
,
format
,
mod
);
length
+=
(
int32_t
)
strftime
(
ts
+
length
,
40
-
length
,
"%z"
,
ptm
);
...
...
source/dnode/mnode/impl/src/mndGrant.c
浏览文件 @
17ecc24e
...
...
@@ -14,12 +14,13 @@
*/
#define _DEFAULT_SOURCE
#ifndef _GRANT
#include "os.h"
#include "taoserror.h"
#include "mndGrant.h"
#include "mndInt.h"
#ifndef _GRANT
int32_t
mndInitGrant
(
SMnode
*
pMnode
)
{
return
TSDB_CODE_SUCCESS
;
}
void
mndCleanupGrant
()
{}
void
grantParseParameter
()
{
mError
(
"can't parsed parameter k"
);
}
...
...
@@ -30,4 +31,4 @@ void grantRestore(EGrantType grant, uint64_t value) {}
#endif
void
parseGrantParameter
()
{
parseGrantParameter
();
}
\ No newline at end of file
void
parseGrantParameter
()
{
grantParseParameter
();
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
17ecc24e
...
...
@@ -23,13 +23,14 @@
#include "tglobal.h"
#include "version.h"
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define QUERY_ID_SIZE
20
#define QUERY_OBJ_ID_SIZE
18
#define SUBQUERY_INFO_SIZE 6
#define QUERY_SAVE_SIZE 20
#define QUERY_SAVE_SIZE
20
typedef
struct
{
int32_t
id
;
int8_t
connType
;
char
user
[
TSDB_USER_LEN
];
char
app
[
TSDB_APP_NAME_LEN
];
// app name that invokes taosc
int64_t
appStartTimeMs
;
// app start time
...
...
@@ -44,8 +45,8 @@ typedef struct {
SQueryDesc
*
pQueries
;
}
SConnObj
;
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
,
int64_t
startTime
);
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
int8_t
connType
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
,
int64_t
startTime
);
static
void
mndFreeConn
(
SConnObj
*
pConn
);
static
SConnObj
*
mndAcquireConn
(
SMnode
*
pMnode
,
int32_t
connId
);
static
void
mndReleaseConn
(
SMnode
*
pMnode
,
SConnObj
*
pConn
);
...
...
@@ -93,8 +94,8 @@ void mndCleanupProfile(SMnode *pMnode) {
}
}
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
,
int64_t
startTime
)
{
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
int8_t
connType
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
,
int64_t
startTime
)
{
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
int32_t
connId
=
atomic_add_fetch_32
(
&
pMgmt
->
connId
,
1
);
...
...
@@ -102,6 +103,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui
if
(
startTime
==
0
)
startTime
=
taosGetTimestampMs
();
SConnObj
connObj
=
{.
id
=
connId
,
.
connType
=
connType
,
.
appStartTimeMs
=
startTime
,
.
pid
=
pid
,
.
ip
=
ip
,
...
...
@@ -159,8 +161,8 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
}
void
*
mndGetNextConn
(
SMnode
*
pMnode
,
SCacheIter
*
pIter
)
{
SConnObj
*
pConn
=
NULL
;
bool
hasNext
=
taosCacheIterNext
(
pIter
);
SConnObj
*
pConn
=
NULL
;
bool
hasNext
=
taosCacheIterNext
(
pIter
);
if
(
hasNext
)
{
size_t
dataLen
=
0
;
pConn
=
taosCacheIterGetData
(
pIter
,
&
dataLen
);
...
...
@@ -210,8 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
}
}
pConn
=
mndCreateConn
(
pMnode
,
pReq
->
user
,
pReq
->
clientIp
,
pReq
->
clientPort
,
connReq
.
pid
,
connReq
.
app
,
connReq
.
startTime
);
pConn
=
mndCreateConn
(
pMnode
,
pReq
->
user
,
connReq
.
connType
,
pReq
->
clientIp
,
pReq
->
clientPort
,
connReq
.
pid
,
connReq
.
app
,
connReq
.
startTime
);
if
(
pConn
==
NULL
)
{
mError
(
"user:%s, failed to login from %s while create connection since %s"
,
pReq
->
user
,
ip
,
terrstr
());
goto
CONN_OVER
;
...
...
@@ -222,6 +224,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
connectRsp
.
superUser
=
pUser
->
superUser
;
connectRsp
.
clusterId
=
pMnode
->
clusterId
;
connectRsp
.
connId
=
pConn
->
id
;
connectRsp
.
connType
=
connReq
.
connType
;
snprintf
(
connectRsp
.
sVersion
,
sizeof
(
connectRsp
.
sVersion
),
"ver:%s
\n
build:%s
\n
gitinfo:%s"
,
version
,
buildinfo
,
gitinfo
);
...
...
@@ -343,7 +346,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
return
-
1
;
}
SClientHbBatchRsp
batchRsp
=
{
0
};
batchRsp
.
rsps
=
taosArrayInit
(
0
,
sizeof
(
SClientHbRsp
));
...
...
@@ -916,4 +918,4 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
int32_t
mndGetNumOfConnections
(
SMnode
*
pMnode
)
{
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
return
taosCacheGetNumOfObj
(
pMgmt
->
cache
);
}
\ No newline at end of file
}
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
17ecc24e
...
...
@@ -652,24 +652,26 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pB
if
(
pShow
->
pIter
==
NULL
)
break
;
cols
=
0
;
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
char
name
[
TSDB_USER_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
name
,
pUser
->
user
,
pShow
->
bytes
[
cols
]);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
name
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
cols
++
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
const
char
*
src
=
pUser
->
superUser
?
"super"
:
"normal"
;
char
b
[
10
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_SIZE_TO_VARSTR
(
b
,
src
,
strlen
(
src
));
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
b
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
cols
++
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pUser
->
createdTime
,
false
);
cols
++
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
STR_WITH_MAXSIZE_TO_VARSTR
(
name
,
pUser
->
acct
,
pShow
->
bytes
[
cols
]);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
name
,
false
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
17ecc24e
...
...
@@ -173,12 +173,12 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
int
mon
=
(
int
)(
tm
.
tm_year
*
12
+
tm
.
tm_mon
+
interval
*
factor
);
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
tw
->
skey
=
convertTimePrecision
((
int64_t
)
m
ktime
(
&
tm
)
*
1000L
,
TSDB_TIME_PRECISION_MILLI
,
precision
);
tw
->
skey
=
convertTimePrecision
((
int64_t
)
taosM
ktime
(
&
tm
)
*
1000L
,
TSDB_TIME_PRECISION_MILLI
,
precision
);
mon
=
(
int
)(
mon
+
interval
);
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
tw
->
ekey
=
convertTimePrecision
((
int64_t
)
m
ktime
(
&
tm
)
*
1000L
,
TSDB_TIME_PRECISION_MILLI
,
precision
);
tw
->
ekey
=
convertTimePrecision
((
int64_t
)
taosM
ktime
(
&
tm
)
*
1000L
,
TSDB_TIME_PRECISION_MILLI
,
precision
);
tw
->
ekey
-=
1
;
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
17ecc24e
...
...
@@ -312,6 +312,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
if
(
pToken
->
type
==
TK_NOW
)
{
ts
=
taosGetTimestamp
(
timePrec
);
}
else
if
(
pToken
->
type
==
TK_TODAY
)
{
ts
=
taosGetTimestampToday
(
timePrec
);
}
else
if
(
pToken
->
type
==
TK_NK_INTEGER
)
{
bool
isSigned
=
false
;
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
ts
,
&
isSigned
);
...
...
@@ -376,8 +378,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
}
static
FORCE_INLINE
int32_t
checkAndTrimValue
(
SToken
*
pToken
,
uint32_t
type
,
char
*
tmpTokenBuf
,
SMsgBuf
*
pMsgBuf
)
{
if
((
pToken
->
type
!=
TK_NOW
&&
pToken
->
type
!=
TK_
NK_INTEGER
&&
pToken
->
type
!=
TK_NK_STRING
&&
pToken
->
type
!=
TK_NK_FLOAT
&&
pToken
->
type
!=
TK_NK_BOOL
&&
pToken
->
type
!=
TK_NULL
&&
pToken
->
type
!=
TK_NK_HEX
&&
pToken
->
type
!=
TK_NK_OCT
&&
pToken
->
type
!=
TK_NK_BIN
)
||
if
((
pToken
->
type
!=
TK_NOW
&&
pToken
->
type
!=
TK_
TODAY
&&
pToken
->
type
!=
TK_NK_INTEGER
&&
pToken
->
type
!=
TK_NK_STRING
&&
pToken
->
type
!=
TK_NK_FLOAT
&&
pToken
->
type
!=
TK_N
K_BOOL
&&
pToken
->
type
!=
TK_N
ULL
&&
pToken
->
type
!=
TK_NK_HEX
&&
pToken
->
type
!=
TK_NK_OCT
&&
pToken
->
type
!=
TK_NK_BIN
)
||
(
pToken
->
n
==
0
)
||
(
pToken
->
type
==
TK_NK_RP
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid data or symbol"
,
pToken
->
z
);
}
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
17ecc24e
...
...
@@ -841,7 +841,7 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
memmove
(
fraction
,
fraction
+
TSDB_TIME_PRECISION_SEC_DIGITS
,
TSDB_TIME_PRECISION_SEC_DIGITS
);
}
struct
tm
*
tmInfo
=
localtime
((
const
time_t
*
)
&
timeVal
);
struct
tm
*
tmInfo
=
taosLocalTime
((
const
time_t
*
)
&
timeVal
,
NULL
);
strftime
(
buf
,
sizeof
(
buf
),
"%Y-%m-%dT%H:%M:%S%z"
,
tmInfo
);
int32_t
len
=
(
int32_t
)
strlen
(
buf
);
...
...
source/os/src/osTime.c
浏览文件 @
17ecc24e
...
...
@@ -406,7 +406,18 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
#endif
}
time_t
taosTime
(
time_t
*
t
)
{
return
time
(
t
);
}
time_t
taosMktime
(
struct
tm
*
timep
)
{
return
mktime
(
timep
);
}
struct
tm
*
taosLocalTime
(
const
time_t
*
timep
,
struct
tm
*
result
)
{
if
(
result
==
NULL
)
{
return
localtime
(
timep
);
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
localtime_s
(
result
,
timep
);
#else
...
...
tests/test/c/tmqDemo.c
浏览文件 @
17ecc24e
...
...
@@ -13,76 +13,71 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#define ALLOW_FORBID_FUNC
#include <assert.h>
#include <dirent.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <stdlib.h>
#include <dirent.h>
#include "taos.h"
#include "taoserror.h"
#include "tlog.h"
#define GREEN "\033[1;32m"
#define NC "\033[0m"
#define GREEN
"\033[1;32m"
#define NC
"\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))
#define
MAX_SQL_STR_LEN
(1024 * 1024)
#define
MAX_ROW_STR_LEN
(16 * 1024)
#define
MAX_SQL_STR_LEN
(1024 * 1024)
#define
MAX_ROW_STR_LEN
(16 * 1024)
enum
_RUN_MODE
{
TMQ_RUN_INSERT_AND_CONSUME
,
TMQ_RUN_ONLY_INSERT
,
TMQ_RUN_ONLY_CONSUME
,
TMQ_RUN_MODE_BUTT
};
enum
_RUN_MODE
{
TMQ_RUN_INSERT_AND_CONSUME
,
TMQ_RUN_ONLY_INSERT
,
TMQ_RUN_ONLY_CONSUME
,
TMQ_RUN_MODE_BUTT
};
typedef
struct
{
char
dbName
[
32
];
char
stbName
[
64
];
char
resultFileName
[
256
];
char
vnodeWalPath
[
256
];
int32_t
numOfThreads
;
int32_t
numOfTables
;
int32_t
numOfVgroups
;
int32_t
runMode
;
int32_t
numOfColumn
;
double
ratio
;
int32_t
batchNumOfRow
;
int32_t
totalRowsOfPerTbl
;
int64_t
startTimestamp
;
int32_t
showMsgFlag
;
int32_t
simCase
;
int32_t
totalRowsOfT2
;
char
dbName
[
32
];
char
stbName
[
64
];
char
resultFileName
[
256
];
char
vnodeWalPath
[
256
];
int32_t
numOfThreads
;
int32_t
numOfTables
;
int32_t
numOfVgroups
;
int32_t
runMode
;
int32_t
numOfColumn
;
double
ratio
;
int32_t
batchNumOfRow
;
int32_t
totalRowsOfPerTbl
;
int64_t
startTimestamp
;
int32_t
showMsgFlag
;
int32_t
simCase
;
int32_t
totalRowsOfT2
;
}
SConfInfo
;
static
SConfInfo
g_stConfInfo
=
{
"tmqdb"
,
"stb"
,
"./tmqResult.txt"
,
// output_file
"./tmqResult.txt"
,
// output_file
""
,
// /data2/dnode/data/vnode/vnode2/wal",
1
,
// threads
1
,
// tables
1
,
// vgroups
0
,
// run mode
1
,
// columns
1
,
// ratio
1
,
// batch size
10000
,
// total rows for per table
0
,
// 2020-01-01 00:00:00.000
0
,
// show consume msg switch
0
,
// if run in sim case
1
,
// threads
1
,
// tables
1
,
// vgroups
0
,
// run mode
1
,
// columns
1
,
// ratio
1
,
// batch size
10000
,
// total rows for per table
0
,
// 2020-01-01 00:00:00.000
0
,
// show consume msg switch
0
,
// if run in sim case
10000
,
};
char
*
g_pRowValue
=
NULL
;
char
*
g_pRowValue
=
NULL
;
TdFilePtr
g_fp
=
NULL
;
static
void
printHelp
()
{
...
...
@@ -125,10 +120,8 @@ static void printHelp() {
exit
(
EXIT_SUCCESS
);
}
void
parseArgument
(
int32_t
argc
,
char
*
argv
[])
{
void
parseArgument
(
int32_t
argc
,
char
*
argv
[])
{
g_stConfInfo
.
startTimestamp
=
1640966400000
;
// 2020-01-01 00:00:00.000
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
if
(
strcmp
(
argv
[
i
],
"-h"
)
==
0
||
strcmp
(
argv
[
i
],
"--help"
)
==
0
)
{
...
...
@@ -156,7 +149,7 @@ void parseArgument(int32_t argc, char *argv[]) {
g_stConfInfo
.
batchNumOfRow
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
)
{
g_stConfInfo
.
totalRowsOfPerTbl
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
)
{
g_stConfInfo
.
numOfColumn
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-q"
)
==
0
)
{
g_stConfInfo
.
ratio
=
atof
(
argv
[
++
i
]);
...
...
@@ -168,7 +161,7 @@ void parseArgument(int32_t argc, char *argv[]) {
g_stConfInfo
.
simCase
=
atol
(
argv
[
++
i
]);
}
else
{
printf
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
exit
(
-
1
);
}
}
...
...
@@ -191,73 +184,71 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
#endif
#endif
}
static
int
running
=
1
;
static
int
running
=
1
;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
// calc dir size (not include itself 4096Byte)
int64_t
getDirectorySize
(
char
*
dir
)
{
TdDirPtr
pDir
;
TdDirEntryPtr
pDirEntry
;
int64_t
totalSize
=
0
;
if
((
pDir
=
taosOpenDir
(
dir
))
==
NULL
)
{
fprintf
(
stderr
,
"Cannot open dir: %s
\n
"
,
dir
);
return
-
1
;
}
int64_t
getDirectorySize
(
char
*
dir
)
{
TdDirPtr
pDir
;
TdDirEntryPtr
pDirEntry
;
int64_t
totalSize
=
0
;
//lstat(dir, &statbuf);
//totalSize+=statbuf.st_size;
while
((
pDirEntry
=
taosReadDir
(
pDir
))
!=
NULL
)
{
char
subdir
[
1024
];
char
*
fileName
=
taosGetDirEntryName
(
pDirEntry
);
sprintf
(
subdir
,
"%s/%s"
,
dir
,
fileName
);
//printf("===d_name: %s\n", entry->d_name);
if
(
taosIsDir
(
subdir
))
{
if
(
strcmp
(
"."
,
fileName
)
==
0
||
strcmp
(
".."
,
fileName
)
==
0
)
{
continue
;
}
int64_t
subDirSize
=
getDirectorySize
(
subdir
);
totalSize
+=
subDirSize
;
}
else
if
(
0
==
strcmp
(
strchr
(
fileName
,
'.'
),
".log"
))
{
// only calc .log file size, and not include .idx file
int64_t
file_size
=
0
;
taosStatFile
(
subdir
,
&
file_size
,
NULL
);
totalSize
+=
file_size
;
}
if
((
pDir
=
taosOpenDir
(
dir
))
==
NULL
)
{
fprintf
(
stderr
,
"Cannot open dir: %s
\n
"
,
dir
);
return
-
1
;
}
// lstat(dir, &statbuf);
// totalSize+=statbuf.st_size;
while
((
pDirEntry
=
taosReadDir
(
pDir
))
!=
NULL
)
{
char
subdir
[
1024
];
char
*
fileName
=
taosGetDirEntryName
(
pDirEntry
);
sprintf
(
subdir
,
"%s/%s"
,
dir
,
fileName
);
// printf("===d_name: %s\n", entry->d_name);
if
(
taosIsDir
(
subdir
))
{
if
(
strcmp
(
"."
,
fileName
)
==
0
||
strcmp
(
".."
,
fileName
)
==
0
)
{
continue
;
}
int64_t
subDirSize
=
getDirectorySize
(
subdir
);
totalSize
+=
subDirSize
;
}
else
if
(
0
==
strcmp
(
strchr
(
fileName
,
'.'
),
".log"
))
{
// only calc .log file size, and not include .idx file
int64_t
file_size
=
0
;
taosStatFile
(
subdir
,
&
file_size
,
NULL
);
totalSize
+=
file_size
;
}
}
taosCloseDir
(
pDir
);
return
totalSize
;
taosCloseDir
(
pDir
);
return
totalSize
;
}
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
TAOS_RES
*
pRes
=
taos_query
(
taos
,
command
);
int
code
=
taos_errno
(
pRes
);
//if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
if
(
code
!=
0
)
{
pError
(
"failed to reason:%s, sql: %s"
,
tstrerror
(
code
),
command
);
taos_free_result
(
pRes
);
return
-
1
;
}
taos_free_result
(
pRes
);
return
0
;
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
TAOS_RES
*
pRes
=
taos_query
(
taos
,
command
);
int
code
=
taos_errno
(
pRes
);
// if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
if
(
code
!=
0
)
{
pError
(
"failed to reason:%s, sql: %s"
,
tstrerror
(
code
),
command
);
taos_free_result
(
pRes
);
return
-
1
;
}
taos_free_result
(
pRes
);
return
0
;
}
int32_t
init_env
()
{
char
sqlStr
[
1024
]
=
{
0
};
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
sprintf
(
sqlStr
,
"create database if not exists %s vgroups %d"
,
g_stConfInfo
.
dbName
,
g_stConfInfo
.
numOfVgroups
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
@@ -282,19 +273,19 @@ int32_t init_env() {
int32_t
dataLen
=
0
;
int32_t
sqlLen
=
0
;
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"create stable if not exists %s (ts timestamp, "
,
g_stConfInfo
.
stbName
);
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"create stable if not exists %s (ts timestamp, "
,
g_stConfInfo
.
stbName
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfColumn
;
i
++
)
{
if
(
i
==
g_stConfInfo
.
numOfColumn
-
1
)
{
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"c%d int) "
,
i
);
memcpy
(
g_pRowValue
+
dataLen
,
"66778899"
,
strlen
(
"66778899"
));
dataLen
+=
strlen
(
"66778899"
);
}
else
{
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"c%d int, "
,
i
);
memcpy
(
g_pRowValue
+
dataLen
,
"66778899, "
,
strlen
(
"66778899, "
));
dataLen
+=
strlen
(
"66778899, "
);
}
}
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"tags (t0 int)"
);
if
(
i
==
g_stConfInfo
.
numOfColumn
-
1
)
{
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"c%d int) "
,
i
);
memcpy
(
g_pRowValue
+
dataLen
,
"66778899"
,
strlen
(
"66778899"
));
dataLen
+=
strlen
(
"66778899"
);
}
else
{
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"c%d int, "
,
i
);
memcpy
(
g_pRowValue
+
dataLen
,
"66778899, "
,
strlen
(
"66778899, "
));
dataLen
+=
strlen
(
"66778899, "
);
}
}
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"tags (t0 int)"
);
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
@@ -313,7 +304,7 @@ int32_t init_env() {
taos_free_result
(
pRes
);
}
//const char* sql = "select * from tu1";
//
const char* sql = "select * from tu1";
sprintf
(
sqlStr
,
"create topic test_stb_topic_1 as select ts,c0 from %s"
,
g_stConfInfo
.
stbName
);
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes
=
taos_query
(
pConn
,
sqlStr
);
...
...
@@ -327,6 +318,7 @@ int32_t init_env() {
}
tmq_t
*
build_consumer
()
{
#if 0
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
...
...
@@ -338,10 +330,15 @@ tmq_t* build_consumer() {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
...
...
@@ -396,20 +393,20 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
if
(
tmqmessage
)
{
batchCnt
++
;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
if
(
0
!=
g_stConfInfo
.
showMsgFlag
)
{
if
(
0
!=
g_stConfInfo
.
showMsgFlag
)
{
/*msg_process(tmqmessage);*/
}
}
tmq_message_destroy
(
tmqmessage
);
}
else
{
break
;
}
}
int64_t
endTime
=
taosGetTimestampUs
();
double
consumeTime
=
(
double
)(
endTime
-
startTime
)
/
1000000
;
double
consumeTime
=
(
double
)(
endTime
-
startTime
)
/
1000000
;
if
(
batchCnt
!=
totalMsgs
)
{
printf
(
"%s inserted msgs: %d and consume msgs: %d mismatch %s"
,
GREEN
,
totalMsgs
,
batchCnt
,
NC
);
/*exit(-1);*/
printf
(
"%s inserted msgs: %d and consume msgs: %d mismatch %s"
,
GREEN
,
totalMsgs
,
batchCnt
,
NC
);
/*exit(-1);*/
}
if
(
0
==
g_stConfInfo
.
simCase
)
{
...
...
@@ -417,12 +414,14 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
}
else
{
printf
(
"{consume success: %d}"
,
totalMsgs
);
}
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.2f| %10.2f |
\n
"
,
batchCnt
,
consumeTime
,
(
double
)
batchCnt
/
consumeTime
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
)
/
consumeTime
,
(
double
)
walLogSize
/
1024
.
0
/
batchCnt
);
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.2f| %10.2f |
\n
"
,
batchCnt
,
consumeTime
,
(
double
)
batchCnt
/
consumeTime
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
)
/
consumeTime
,
(
double
)
walLogSize
/
1024
.
0
/
batchCnt
);
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
{
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
exit
(
-
1
);
}
}
...
...
@@ -430,7 +429,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
int32_t
syncWriteData
()
{
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
return
-
1
;
}
char
sqlStr
[
1024
]
=
{
0
};
...
...
@@ -449,11 +448,11 @@ int32_t syncWriteData() {
}
int32_t
totalMsgs
=
0
;
int64_t
time_counter
=
g_stConfInfo
.
startTimestamp
;
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
totalRowsOfPerTbl
;)
{
for
(
int
tID
=
0
;
tID
<=
g_stConfInfo
.
numOfTables
-
1
;
tID
++
)
{
int
inserted
=
i
;
int
inserted
=
i
;
int64_t
tmp_time
=
time_counter
;
int32_t
data_len
=
0
;
...
...
@@ -465,22 +464,22 @@ int32_t syncWriteData() {
k
++
;
if
(
inserted
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
{
break
;
break
;
}
if
(
data_len
>
MAX_SQL_STR_LEN
-
MAX_ROW_STR_LEN
)
{
if
(
data_len
>
MAX_SQL_STR_LEN
-
MAX_ROW_STR_LEN
)
{
break
;
}
}
}
int
code
=
queryDB
(
pConn
,
buffer
);
if
(
0
!=
code
)
{
if
(
0
!=
code
)
{
fprintf
(
stderr
,
"insert data error!
\n
"
);
taosMemoryFreeClear
(
buffer
);
return
-
1
;
}
taosMemoryFreeClear
(
buffer
);
return
-
1
;
}
totalMsgs
++
;
totalMsgs
++
;
if
(
tID
==
g_stConfInfo
.
numOfTables
-
1
)
{
i
=
inserted
;
...
...
@@ -492,12 +491,11 @@ int32_t syncWriteData() {
return
totalMsgs
;
}
// sync insertion
int32_t
syncWriteDataByRatio
()
{
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
return
-
1
;
}
char
sqlStr
[
1024
]
=
{
0
};
...
...
@@ -518,27 +516,27 @@ int32_t syncWriteDataByRatio() {
int32_t
totalMsgs
=
0
;
int32_t
insertedOfT1
=
0
;
int32_t
insertedOfT2
=
0
;
int32_t
insertedOfT2
=
0
;
int64_t
tsOfT1
=
g_stConfInfo
.
startTimestamp
;
int64_t
tsOfT2
=
g_stConfInfo
.
startTimestamp
;
int64_t
tmp_time
;
for
(;;)
{
if
((
insertedOfT1
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
&&
(
insertedOfT2
>=
g_stConfInfo
.
totalRowsOfT2
))
{
if
((
insertedOfT1
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
&&
(
insertedOfT2
>=
g_stConfInfo
.
totalRowsOfT2
))
{
break
;
}
}
for
(
int
tID
=
0
;
tID
<=
g_stConfInfo
.
numOfTables
-
1
;
tID
++
)
{
if
(
0
==
tID
)
{
tmp_time
=
tsOfT1
;
tmp_time
=
tsOfT1
;
if
(
insertedOfT1
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
{
continue
;
continue
;
}
}
else
if
(
1
==
tID
){
tmp_time
=
tsOfT2
;
}
else
if
(
1
==
tID
)
{
tmp_time
=
tsOfT2
;
if
(
insertedOfT2
>=
g_stConfInfo
.
totalRowsOfT2
)
{
continue
;
continue
;
}
}
...
...
@@ -548,79 +546,86 @@ int32_t syncWriteDataByRatio() {
for
(
k
=
0
;
k
<
g_stConfInfo
.
batchNumOfRow
;)
{
data_len
+=
sprintf
(
buffer
+
data_len
,
"(%"
PRId64
", %s) "
,
tmp_time
++
,
g_pRowValue
);
k
++
;
if
(
0
==
tID
)
{
if
(
0
==
tID
)
{
insertedOfT1
++
;
if
(
insertedOfT1
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
{
break
;
}
}
else
if
(
1
==
tID
)
{
if
(
insertedOfT1
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
{
break
;
}
}
else
if
(
1
==
tID
)
{
insertedOfT2
++
;
if
(
insertedOfT2
>=
g_stConfInfo
.
totalRowsOfT2
)
{
break
;
}
}
if
(
insertedOfT2
>=
g_stConfInfo
.
totalRowsOfT2
)
{
break
;
}
}
if
(
data_len
>
MAX_SQL_STR_LEN
-
MAX_ROW_STR_LEN
)
{
if
(
data_len
>
MAX_SQL_STR_LEN
-
MAX_ROW_STR_LEN
)
{
break
;
}
}
}
int
code
=
queryDB
(
pConn
,
buffer
);
if
(
0
!=
code
)
{
if
(
0
!=
code
)
{
fprintf
(
stderr
,
"insert data error!
\n
"
);
taosMemoryFreeClear
(
buffer
);
return
-
1
;
}
taosMemoryFreeClear
(
buffer
);
return
-
1
;
}
if
(
0
==
tID
)
{
tsOfT1
=
tmp_time
;
}
else
if
(
1
==
tID
){
tsOfT2
=
tmp_time
;
tsOfT1
=
tmp_time
;
}
else
if
(
1
==
tID
)
{
tsOfT2
=
tmp_time
;
}
totalMsgs
++
;
totalMsgs
++
;
}
}
pPrint
(
"expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]
\n
"
,
g_stConfInfo
.
totalRowsOfPerTbl
,
g_stConfInfo
.
totalRowsOfT2
,
insertedOfT1
,
insertedOfT2
);
pPrint
(
"expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]
\n
"
,
g_stConfInfo
.
totalRowsOfPerTbl
,
g_stConfInfo
.
totalRowsOfT2
,
insertedOfT1
,
insertedOfT2
);
taosMemoryFreeClear
(
buffer
);
return
totalMsgs
;
}
void
printParaIntoFile
()
{
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
TdFilePtr
pFile
=
taosOpenFile
(
g_stConfInfo
.
resultFileName
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
|
TD_FILE_STREAM
);
TdFilePtr
pFile
=
taosOpenFile
(
g_stConfInfo
.
resultFileName
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
g_stConfInfo
.
resultFileName
);
exit
-
1
;
exit
-
1
;
};
g_fp
=
pFile
;
time_t
tTime
=
taosGetTimestampSec
();
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
localtime
(
&
tTime
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
pFile
,
"# dbName: %s
\n
"
,
g_stConfInfo
.
dbName
);
taosFprintfFile
(
pFile
,
"# stbName: %s
\n
"
,
g_stConfInfo
.
stbName
);
taosFprintfFile
(
pFile
,
"# vnodeWalPath: %s
\n
"
,
g_stConfInfo
.
vnodeWalPath
);
taosFprintfFile
(
pFile
,
"# numOfTables: %d
\n
"
,
g_stConfInfo
.
numOfTables
);
taosFprintfFile
(
pFile
,
"# numOfThreads: %d
\n
"
,
g_stConfInfo
.
numOfThreads
);
taosFprintfFile
(
pFile
,
"# numOfVgroups: %d
\n
"
,
g_stConfInfo
.
numOfVgroups
);
taosFprintfFile
(
pFile
,
"# runMode: %d
\n
"
,
g_stConfInfo
.
runMode
);
taosFprintfFile
(
pFile
,
"# ratio: %f
\n
"
,
g_stConfInfo
.
ratio
);
taosFprintfFile
(
pFile
,
"# numOfColumn: %d
\n
"
,
g_stConfInfo
.
numOfColumn
);
taosFprintfFile
(
pFile
,
"# batchNumOfRow: %d
\n
"
,
g_stConfInfo
.
batchNumOfRow
);
taosFprintfFile
(
pFile
,
"# totalRowsOfPerTbl: %d
\n
"
,
g_stConfInfo
.
totalRowsOfPerTbl
);
taosFprintfFile
(
pFile
,
"# totalRowsOfT2: %d
\n
"
,
g_stConfInfo
.
totalRowsOfT2
);
taosFprintfFile
(
pFile
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
pFile
,
"# dbName: %s
\n
"
,
g_stConfInfo
.
dbName
);
taosFprintfFile
(
pFile
,
"# stbName: %s
\n
"
,
g_stConfInfo
.
stbName
);
taosFprintfFile
(
pFile
,
"# vnodeWalPath: %s
\n
"
,
g_stConfInfo
.
vnodeWalPath
);
taosFprintfFile
(
pFile
,
"# numOfTables: %d
\n
"
,
g_stConfInfo
.
numOfTables
);
taosFprintfFile
(
pFile
,
"# numOfThreads: %d
\n
"
,
g_stConfInfo
.
numOfThreads
);
taosFprintfFile
(
pFile
,
"# numOfVgroups: %d
\n
"
,
g_stConfInfo
.
numOfVgroups
);
taosFprintfFile
(
pFile
,
"# runMode: %d
\n
"
,
g_stConfInfo
.
runMode
);
taosFprintfFile
(
pFile
,
"# ratio: %f
\n
"
,
g_stConfInfo
.
ratio
);
taosFprintfFile
(
pFile
,
"# numOfColumn: %d
\n
"
,
g_stConfInfo
.
numOfColumn
);
taosFprintfFile
(
pFile
,
"# batchNumOfRow: %d
\n
"
,
g_stConfInfo
.
batchNumOfRow
);
taosFprintfFile
(
pFile
,
"# totalRowsOfPerTbl: %d
\n
"
,
g_stConfInfo
.
totalRowsOfPerTbl
);
taosFprintfFile
(
pFile
,
"# totalRowsOfT2: %d
\n
"
,
g_stConfInfo
.
totalRowsOfT2
);
taosFprintfFile
(
pFile
,
"# Test time: %d-%02d-%02d %02d:%02d:%02d
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"|-------------------------------insert info-----------------------------|--------------------------------consume info---------------------------------|
\n
"
);
taosFprintfFile
(
pFile
,
"|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume time(s) | msgs/s | MB/s | avg msg size(KB) |
\n
"
);
taosFprintfFile
(
pFile
,
"|-------------------------------insert "
"info-----------------------------|--------------------------------consume "
"info---------------------------------|
\n
"
);
taosFprintfFile
(
pFile
,
"|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume "
"time(s) | msgs/s | MB/s | avg msg size(KB) |
\n
"
);
taosFprintfFile
(
g_fp
,
"|%10d"
,
g_stConfInfo
.
batchNumOfRow
);
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
int
main
(
int32_t
argc
,
char
*
argv
[])
{
parseArgument
(
argc
,
argv
);
printParaIntoFile
();
...
...
@@ -630,70 +635,70 @@ int main(int32_t argc, char *argv[]) {
code
=
init_env
();
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"%% init_env error!
\n
"
);
return
-
1
;
return
-
1
;
}
int32_t
totalMsgs
=
0
;
if
(
g_stConfInfo
.
runMode
!=
TMQ_RUN_ONLY_CONSUME
)
{
int64_t
startTs
=
taosGetTimestampUs
();
if
(
1
==
g_stConfInfo
.
ratio
)
{
totalMsgs
=
syncWriteData
();
totalMsgs
=
syncWriteData
();
}
else
{
totalMsgs
=
syncWriteDataByRatio
();
}
totalMsgs
=
syncWriteDataByRatio
();
}
if
(
totalMsgs
<=
0
)
{
pError
(
"inset data error!
\n
"
);
pError
(
"inset data error!
\n
"
);
return
-
1
;
}
int64_t
endTs
=
taosGetTimestampUs
();
int64_t
delay
=
endTs
-
startTs
;
int32_t
totalRows
=
0
;
if
(
1
==
g_stConfInfo
.
ratio
)
{
totalRows
=
g_stConfInfo
.
totalRowsOfPerTbl
*
g_stConfInfo
.
numOfTables
;
}
else
{
totalRows
=
g_stConfInfo
.
totalRowsOfPerTbl
*
(
1
+
g_stConfInfo
.
ratio
);
}
float
seconds
=
delay
/
1000000
.
0
;
float
rowsSpeed
=
totalRows
/
seconds
;
float
msgsSpeed
=
totalMsgs
/
seconds
;
if
((
0
==
g_stConfInfo
.
simCase
)
&&
(
strlen
(
g_stConfInfo
.
vnodeWalPath
)))
{
walLogSize
=
getDirectorySize
(
g_stConfInfo
.
vnodeWalPath
);
if
(
walLogSize
<=
0
)
{
printf
(
"%s size incorrect!"
,
g_stConfInfo
.
vnodeWalPath
);
exit
(
-
1
);
}
else
{
pPrint
(
".log file size in vnode2/wal: %.3f MBytes
\n
"
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
}
}
if
(
0
==
g_stConfInfo
.
simCase
)
{
pPrint
(
"insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second
\n
"
,
totalRows
,
totalMsgs
,
seconds
,
rowsSpeed
,
msgsSpeed
);
}
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.3f "
,
totalMsgs
,
seconds
,
msgsSpeed
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
int64_t
endTs
=
taosGetTimestampUs
();
int64_t
delay
=
endTs
-
startTs
;
int32_t
totalRows
=
0
;
if
(
1
==
g_stConfInfo
.
ratio
)
{
totalRows
=
g_stConfInfo
.
totalRowsOfPerTbl
*
g_stConfInfo
.
numOfTables
;
}
else
{
totalRows
=
g_stConfInfo
.
totalRowsOfPerTbl
*
(
1
+
g_stConfInfo
.
ratio
);
}
float
seconds
=
delay
/
1000000
.
0
;
float
rowsSpeed
=
totalRows
/
seconds
;
float
msgsSpeed
=
totalMsgs
/
seconds
;
if
((
0
==
g_stConfInfo
.
simCase
)
&&
(
strlen
(
g_stConfInfo
.
vnodeWalPath
)))
{
walLogSize
=
getDirectorySize
(
g_stConfInfo
.
vnodeWalPath
);
if
(
walLogSize
<=
0
)
{
printf
(
"%s size incorrect!"
,
g_stConfInfo
.
vnodeWalPath
);
exit
(
-
1
);
}
else
{
pPrint
(
".log file size in vnode2/wal: %.3f MBytes
\n
"
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
}
}
if
(
0
==
g_stConfInfo
.
simCase
)
{
pPrint
(
"insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second
\n
"
,
totalRows
,
totalMsgs
,
seconds
,
rowsSpeed
,
msgsSpeed
);
}
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.3f "
,
totalMsgs
,
seconds
,
msgsSpeed
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
}
if
(
g_stConfInfo
.
runMode
==
TMQ_RUN_ONLY_INSERT
)
{
return
0
;
}
tmq_t
*
tmq
=
build_consumer
();
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
)){
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
))
{
return
-
1
;
}
perf_loop
(
tmq
,
topic_list
,
totalMsgs
,
walLogSize
);
taosMemoryFreeClear
(
g_pRowValue
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosCloseFile
(
&
g_fp
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosCloseFile
(
&
g_fp
);
return
0
;
}
tests/test/c/tmqSim.c
浏览文件 @
17ecc24e
...
...
@@ -35,8 +35,8 @@
#define MAX_ROW_STR_LEN (16 * 1024)
typedef
struct
{
int32_t
expectMsgCnt
;
int32_t
consumeMsgCnt
;
int32_t
expectMsgCnt
;
int32_t
consumeMsgCnt
;
TdThread
thread
;
}
SThreadInfo
;
...
...
@@ -45,12 +45,12 @@ typedef struct {
char
dbName
[
32
];
char
topicString
[
256
];
char
keyString
[
1024
];
char
topicString1
[
256
];
char
keyString1
[
1024
];
char
topicString1
[
256
];
char
keyString1
[
1024
];
int32_t
showMsgFlag
;
int32_t
consumeDelay
;
// unit s
int32_t
consumeMsgCnt
;
int32_t
checkMode
;
int32_t
checkMode
;
// save result after parse agrvs
int32_t
numOfTopic
;
...
...
@@ -59,13 +59,13 @@ typedef struct {
int32_t
numOfKey
;
char
key
[
32
][
64
];
char
value
[
32
][
64
];
int32_t
numOfTopic1
;
char
topics1
[
32
][
64
];
int32_t
numOfKey1
;
char
key1
[
32
][
64
];
char
value1
[
32
][
64
];
int32_t
numOfTopic1
;
char
topics1
[
32
][
64
];
int32_t
numOfKey1
;
char
key1
[
32
][
64
];
char
value1
[
32
][
64
];
}
SConfInfo
;
static
SConfInfo
g_stConfInfo
;
...
...
@@ -186,18 +186,18 @@ void parseInputString() {
ltrim
(
g_stConfInfo
.
topics
[
g_stConfInfo
.
numOfTopic
]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
numOfTopic
++
;
token
=
strtok
(
NULL
,
delim
);
}
}
token
=
strtok
(
g_stConfInfo
.
topicString1
,
delim
);
while
(
token
!=
NULL
)
{
//printf("%s\n", token );
strcpy
(
g_stConfInfo
.
topics1
[
g_stConfInfo
.
numOfTopic1
],
token
);
while
(
token
!=
NULL
)
{
//
printf("%s\n", token );
strcpy
(
g_stConfInfo
.
topics1
[
g_stConfInfo
.
numOfTopic1
],
token
);
ltrim
(
g_stConfInfo
.
topics1
[
g_stConfInfo
.
numOfTopic1
]);
//
printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
numOfTopic1
++
;
//
printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
numOfTopic1
++
;
token
=
strtok
(
NULL
,
delim
);
}
...
...
@@ -214,28 +214,28 @@ void parseInputString() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
numOfKey
++
;
}
token
=
strtok
(
NULL
,
delim
);
}
token
=
strtok
(
g_stConfInfo
.
keyString1
,
delim
);
while
(
token
!=
NULL
)
{
//printf("%s\n", token );
{
char
*
pstr
=
token
;
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
key1
[
g_stConfInfo
.
numOfKey1
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
value1
[
g_stConfInfo
.
numOfKey1
],
ret
+
1
);
//printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
numOfKey1
++
;
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
{
char
*
pstr
=
token
;
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
key1
[
g_stConfInfo
.
numOfKey1
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
value1
[
g_stConfInfo
.
numOfKey1
],
ret
+
1
);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
numOfKey1
++
;
}
token
=
strtok
(
NULL
,
delim
);
}
}
static
int
running
=
1
;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
...
...
@@ -253,6 +253,7 @@ int queryDB(TAOS* taos, char* command) {
}
tmq_t
*
build_consumer
()
{
#if 0
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
...
...
@@ -266,13 +267,19 @@ tmq_t* build_consumer() {
exit(-1);
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
// tmq_conf_set(conf, "group.id", "tg2");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfKey
;
i
++
)
{
tmq_conf_set
(
conf
,
g_stConfInfo
.
key
[
i
],
g_stConfInfo
.
value
[
i
]);
}
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
...
...
@@ -285,10 +292,10 @@ tmq_list_t* build_topic_list() {
return
topic_list
;
}
tmq_t
*
build_consumer_x
()
{
#if 0
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
...
...
@@ -296,23 +303,29 @@ tmq_t* build_consumer_x() {
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
taos_free_result
(
pRes
);
exit
(
-
1
);
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
//tmq_conf_set(conf, "group.id", "tg2");
//
tmq_conf_set(conf, "group.id", "tg2");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfKey1
;
i
++
)
{
tmq_conf_set
(
conf
,
g_stConfInfo
.
key1
[
i
],
g_stConfInfo
.
value1
[
i
]);
}
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list_x
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
//tmq_list_append(topic_list, "test_stb_topic_1");
//
tmq_list_append(topic_list, "test_stb_topic_1");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfTopic1
;
i
++
)
{
tmq_list_append
(
topic_list
,
g_stConfInfo
.
topics1
[
i
]);
}
...
...
@@ -367,9 +380,9 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
if
(
tmqMsg
)
{
totalMsgs
++
;
//
printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
//
printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
#if 0
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
...
...
@@ -396,65 +409,63 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
exit
(
-
1
);
}
//printf("%d", totalMsgs); // output to sim for check result
//
printf("%d", totalMsgs); // output to sim for check result
return
totalMsgs
;
}
void
*
threadFunc
(
void
*
param
)
{
void
*
threadFunc
(
void
*
param
)
{
int32_t
totalMsgs
=
0
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
tmq_t
*
tmq
=
build_consumer_x
();
tmq_t
*
tmq
=
build_consumer_x
();
tmq_list_t
*
topic_list
=
build_topic_list_x
();
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
)){
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
))
{
return
NULL
;
}
tmq_resp_err_t
err
=
tmq_subscribe
(
tmq
,
topic_list
);
if
(
err
)
{
printf
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
//if (0 == g_stConfInfo.consumeMsgCnt) {
// loop_consume(tmq);
//} else {
pInfo
->
consumeMsgCnt
=
parallel_consume
(
tmq
,
1
);
//
if (0 == g_stConfInfo.consumeMsgCnt) {
//
loop_consume(tmq);
//
} else {
pInfo
->
consumeMsgCnt
=
parallel_consume
(
tmq
,
1
);
//}
err
=
tmq_unsubscribe
(
tmq
);
if
(
err
)
{
printf
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
pInfo
->
consumeMsgCnt
=
-
1
;
pInfo
->
consumeMsgCnt
=
-
1
;
return
NULL
;
}
return
NULL
;
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
parseArgument
(
argc
,
argv
);
parseInputString
();
int32_t
numOfThreads
=
1
;
int32_t
numOfThreads
=
1
;
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
taosThreadAttrSetDetachState
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
taosMemoryCalloc
(
numOfThreads
,
sizeof
(
SThreadInfo
));
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
taosMemoryCalloc
(
numOfThreads
,
sizeof
(
SThreadInfo
));
if
(
g_stConfInfo
.
numOfTopic1
)
{
// pthread_create one thread to consume
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pInfo
[
i
].
expectMsgCnt
=
0
;
pInfo
[
i
].
consumeMsgCnt
=
0
;
taosThreadCreate
(
&
(
pInfo
[
i
].
thread
),
&
thattr
,
threadFunc
,
(
void
*
)(
pInfo
+
i
));
taosThreadCreate
(
&
(
pInfo
[
i
].
thread
),
&
thattr
,
threadFunc
,
(
void
*
)(
pInfo
+
i
));
}
}
int32_t
totalMsgs
=
0
;
int32_t
totalMsgs
=
0
;
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
))
{
...
...
@@ -479,48 +490,48 @@ int main(int32_t argc, char* argv[]) {
exit
(
-
1
);
}
if
(
g_stConfInfo
.
numOfTopic1
)
{
if
(
g_stConfInfo
.
numOfTopic1
)
{
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
i
++
)
{
taosThreadJoin
(
pInfo
[
i
].
thread
,
NULL
);
}
//
printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
if
(
0
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
//
printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
if
(
0
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
1
==
g_stConfInfo
.
checkMode
)
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
1
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
))
{
printf
(
"success"
);
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
2
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
3
*
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
2
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
3
*
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
3
==
g_stConfInfo
.
checkMode
)
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
3
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
{
printf
(
"success"
);
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
4
==
g_stConfInfo
.
checkMode
)
{
if
(((
totalMsgs
==
0
)
&&
(
pInfo
->
consumeMsgCnt
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
0
)
&&
(
totalMsgs
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)))
{
printf
(
"success"
);
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
4
==
g_stConfInfo
.
checkMode
)
{
if
(((
totalMsgs
==
0
)
&&
(
pInfo
->
consumeMsgCnt
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
0
)
&&
(
totalMsgs
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)))
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
{
printf
(
"fail, check mode unknow. consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
{
printf
(
"fail, check mode unknow. consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
return
0
;
...
...
tests/tsim/src/simExe.c
浏览文件 @
17ecc24e
...
...
@@ -678,7 +678,7 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
if
(
tt
<
0
)
tt
=
0
;
#endif
tp
=
localtime
(
&
tt
);
tp
=
taosLocalTime
(
&
tt
,
NULL
);
strftime
(
timeStr
,
64
,
"%y-%m-%d %H:%M:%S"
,
tp
);
if
(
precision
==
TSDB_TIME_PRECISION_MILLI
)
{
sprintf
(
value
,
"%s.%03d"
,
timeStr
,
(
int32_t
)(
*
((
int64_t
*
)
row
[
i
])
%
1000
));
...
...
tools/shell/src/shellEngine.c
浏览文件 @
17ecc24e
...
...
@@ -452,7 +452,7 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) {
}
}
struct
tm
*
ptm
=
localtime
(
&
tt
);
struct
tm
*
ptm
=
taosLocalTime
(
&
tt
,
NULL
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录