Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7adb395b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7adb395b
编写于
4月 14, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: add connection type for tmq
上级
4473d236
变更
13
展开全部
隐藏空白更改
内联
并排
Showing
13 changed file
with
448 addition
and
401 deletion
+448
-401
example/src/tmq.c
example/src/tmq.c
+6
-1
include/client/taos.h
include/client/taos.h
+2
-2
include/common/tmsg.h
include/common/tmsg.h
+2
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+7
-2
source/client/src/clientHb.c
source/client/src/clientHb.c
+8
-9
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+9
-8
source/client/src/clientMain.c
source/client/src/clientMain.c
+5
-3
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+49
-45
source/client/src/tmq.c
source/client/src/tmq.c
+9
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-0
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+15
-13
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+230
-226
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+102
-91
未找到文件。
example/src/tmq.c
浏览文件 @
7adb395b
...
...
@@ -144,6 +144,7 @@ void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
...
...
@@ -152,11 +153,15 @@ tmq_t* build_consumer() {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
"abc1"
);
tmq_conf_set_offset_commit_cb
(
conf
,
tmq_commit_cb_print
);
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
tmq_t
*
tmq
=
tmq_consumer_new
1
(
conf
,
NULL
,
0
);
return
tmq
;
}
...
...
include/client/taos.h
浏览文件 @
7adb395b
...
...
@@ -247,10 +247,10 @@ DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
const
char
*
);
DLL_EXPORT
void
tmq_list_destroy
(
tmq_list_t
*
);
// will be removed in 3.0
#if 1
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
#endif
// will replace last one
DLL_EXPORT
tmq_t
*
tmq_consumer_new1
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
const
char
*
tmq_err2str
(
tmq_resp_err_t
);
...
...
include/common/tmsg.h
浏览文件 @
7adb395b
...
...
@@ -332,6 +332,7 @@ int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void
*
taosDecodeSEpSet
(
void
*
buf
,
SEpSet
*
pEp
);
typedef
struct
{
int8_t
connType
;
int32_t
pid
;
char
app
[
TSDB_APP_NAME_LEN
];
char
db
[
TSDB_DB_NAME_LEN
];
...
...
@@ -346,6 +347,7 @@ typedef struct {
int64_t
clusterId
;
int32_t
connId
;
int8_t
superUser
;
int8_t
connType
;
SEpSet
epSet
;
char
sVersion
[
128
];
}
SConnectRsp
;
...
...
source/client/inc/clientInt.h
浏览文件 @
7adb395b
...
...
@@ -45,6 +45,11 @@ extern "C" {
#define HEARTBEAT_INTERVAL 1500 // ms
enum
{
CONN_TYPE__QUERY
=
1
,
CONN_TYPE__TMQ
,
};
typedef
struct
SAppInstInfo
SAppInstInfo
;
typedef
struct
{
...
...
@@ -132,9 +137,9 @@ typedef struct STscObj {
char
pass
[
TSDB_PASSWORD_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
char
ver
[
128
];
int8_t
connType
;
int32_t
acctId
;
uint32_t
connId
;
int32_t
connType
;
uint64_t
id
;
// ref ID returned by taosAddRef
TdThreadMutex
mutex
;
// used to protect the operation on db
int32_t
numOfReqs
;
// number of sqlObj bound to this connection
...
...
@@ -272,7 +277,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void
initMsgHandleFp
();
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
uint16_t
port
,
int
connType
);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
bool
topicQuery
,
SQuery
**
pQuery
);
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
);
...
...
source/client/src/clientHb.c
浏览文件 @
7adb395b
...
...
@@ -23,6 +23,8 @@ static SClientHbMgr clientHbMgr = {0};
static
int32_t
hbCreateThread
();
static
void
hbStopThread
();
static
int32_t
hbMqHbReqHandle
(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
)
{
return
0
;
}
static
int32_t
hbMqHbRspHandle
(
SAppHbMgr
*
pAppHbMgr
,
SClientHbRsp
*
pRsp
)
{
return
0
;
}
static
int32_t
hbProcessDBInfoRsp
(
void
*
value
,
int32_t
valueLen
,
struct
SCatalog
*
pCatalog
)
{
...
...
@@ -297,11 +299,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return
TSDB_CODE_SUCCESS
;
}
int32_t
hbMqHbReqHandle
(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
)
{
return
0
;
}
void
hbMgrInitMqHbHandle
()
{
clientHbMgr
.
reqHandle
[
HEARTBEAT_TYPE_QUERY
]
=
hbQueryHbReqHandle
;
clientHbMgr
.
reqHandle
[
HEARTBEAT_TYPE_MQ
]
=
hbMqHbReqHandle
;
clientHbMgr
.
rspHandle
[
HEARTBEAT_TYPE_QUERY
]
=
hbQueryHbRspHandle
;
clientHbMgr
.
rspHandle
[
HEARTBEAT_TYPE_MQ
]
=
hbMqHbRspHandle
;
}
...
...
@@ -438,7 +439,7 @@ static int32_t hbCreateThread() {
if
(
taosThreadCreate
(
&
clientHbMgr
.
thread
,
&
thAttr
,
hbThreadFunc
,
NULL
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
taosThreadAttrDestroy
(
&
thAttr
);
return
0
;
}
...
...
@@ -568,7 +569,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int32_t
connId
,
int64_t
clusterId
,
int32_t
hbType
)
{
SClientHbKey
connKey
=
{
.
connId
=
connId
,
.
hbType
=
HEARTBEAT_TYPE_QUERY
,
.
hbType
=
hbType
,
};
SHbConnInfo
info
=
{
0
};
...
...
@@ -578,16 +579,14 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3
*
pClusterId
=
clusterId
;
info
.
param
=
pClusterId
;
break
;
return
hbRegisterConnImpl
(
pAppHbMgr
,
connKey
,
&
info
)
;
}
case
HEARTBEAT_TYPE_MQ
:
{
break
;
return
0
;
}
default:
break
;
return
0
;
}
return
hbRegisterConnImpl
(
pAppHbMgr
,
connKey
,
&
info
);
}
void
hbDeregisterConn
(
SAppHbMgr
*
pAppHbMgr
,
SClientHbKey
connKey
)
{
...
...
source/client/src/clientImpl.c
浏览文件 @
7adb395b
...
...
@@ -11,7 +11,7 @@
#include "tref.h"
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
,
int8_t
connType
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
static
bool
stringLengthCheck
(
const
char
*
str
,
size_t
maxsize
)
{
...
...
@@ -40,10 +40,10 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
static
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
SAppInstInfo
*
pAppInfo
,
int
connType
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
uint16_t
port
,
int
connType
)
{
if
(
taos_init
()
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
...
...
@@ -111,7 +111,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
taosThreadMutexUnlock
(
&
appInfo
.
mutex
);
taosMemoryFreeClear
(
key
);
return
taosConnectImpl
(
user
,
&
secretEncrypt
[
0
],
localDb
,
NULL
,
NULL
,
*
pInst
);
return
taosConnectImpl
(
user
,
&
secretEncrypt
[
0
],
localDb
,
NULL
,
NULL
,
*
pInst
,
connType
);
}
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
)
{
...
...
@@ -425,7 +425,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
}
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
)
{
SAppInstInfo
*
pAppInfo
,
int
connType
)
{
STscObj
*
pTscObj
=
createTscObj
(
user
,
auth
,
db
,
pAppInfo
);
if
(
NULL
==
pTscObj
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -439,7 +439,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return
NULL
;
}
SMsgSendInfo
*
body
=
buildConnectMsg
(
pRequest
);
SMsgSendInfo
*
body
=
buildConnectMsg
(
pRequest
,
connType
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
transporterId
,
body
);
...
...
@@ -462,7 +462,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return
pTscObj
;
}
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
)
{
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
,
int8_t
connType
)
{
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
pMsgSendInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -485,6 +485,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
}
taosMemoryFreeClear
(
db
);
connectReq
.
connType
=
connType
;
connectReq
.
pid
=
htonl
(
appInfo
.
pid
);
connectReq
.
startTime
=
htobe64
(
appInfo
.
startTime
);
tstrncpy
(
connectReq
.
app
,
appInfo
.
appName
,
sizeof
(
connectReq
.
app
));
...
...
@@ -570,7 +571,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
return
NULL
;
}
return
taos_connect_internal
(
ip
,
user
,
NULL
,
auth
,
db
,
port
);
return
taos_connect_internal
(
ip
,
user
,
NULL
,
auth
,
db
,
port
,
CONN_TYPE__QUERY
);
}
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
...
...
source/client/src/clientMain.c
浏览文件 @
7adb395b
...
...
@@ -87,7 +87,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass
=
TSDB_DEFAULT_PASS
;
}
return
taos_connect_internal
(
ip
,
user
,
pass
,
NULL
,
db
,
port
);
return
taos_connect_internal
(
ip
,
user
,
pass
,
NULL
,
db
,
port
,
CONN_TYPE__QUERY
);
}
void
taos_close
(
TAOS
*
taos
)
{
...
...
@@ -124,8 +124,10 @@ const char *taos_errstr(TAOS_RES *res) {
}
void
taos_free_result
(
TAOS_RES
*
res
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
destroyRequest
(
pRequest
);
if
(
TD_RES_QUERY
(
res
))
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
destroyRequest
(
pRequest
);
}
}
int
taos_field_count
(
TAOS_RES
*
res
)
{
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
7adb395b
...
...
@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tdef.h"
#include "tname.h"
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "
catalog
.h"
#include "
os
.h"
#include "query.h"
#include "tdef.h"
#include "tname.h"
int32_t
(
*
handleRequestRspFp
[
TDMT_MAX
])(
void
*
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
...
...
@@ -69,9 +69,9 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj
->
pAppInfo
->
clusterId
=
connectRsp
.
clusterId
;
atomic_add_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
pTscObj
->
connType
=
HEARTBEAT_TYPE_QUERY
;
pTscObj
->
connType
=
connectRsp
.
connType
;
hbRegisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connectRsp
.
connId
,
connectRsp
.
clusterId
,
HEARTBEAT_TYPE_QUERY
);
hbRegisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connectRsp
.
connId
,
connectRsp
.
clusterId
,
connectRsp
.
connType
);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug
(
"0x%"
PRIx64
" clusterId:%"
PRId64
", totalConn:%"
PRId64
,
pRequest
->
requestId
,
connectRsp
.
clusterId
,
...
...
@@ -82,13 +82,13 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return
0
;
}
SMsgSendInfo
*
buildMsgInfoImpl
(
SRequestObj
*
pRequest
)
{
SMsgSendInfo
*
buildMsgInfoImpl
(
SRequestObj
*
pRequest
)
{
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
pMsgSendInfo
->
requestObjRefId
=
pRequest
->
self
;
pMsgSendInfo
->
requestId
=
pRequest
->
requestId
;
pMsgSendInfo
->
param
=
pRequest
;
pMsgSendInfo
->
msgType
=
pRequest
->
type
;
pMsgSendInfo
->
requestId
=
pRequest
->
requestId
;
pMsgSendInfo
->
param
=
pRequest
;
pMsgSendInfo
->
msgType
=
pRequest
->
type
;
if
(
pRequest
->
type
==
TDMT_MND_SHOW_RETRIEVE
||
pRequest
->
type
==
TDMT_VND_SHOW_TABLES_FETCH
)
{
if
(
pRequest
->
type
==
TDMT_MND_SHOW_RETRIEVE
)
{
...
...
@@ -119,7 +119,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
pMsgSendInfo
->
msgInfo
=
pRequest
->
body
.
requestMsg
;
}
pMsgSendInfo
->
fp
=
(
handleRequestRspFp
[
TMSG_INDEX
(
pRequest
->
type
)]
==
NULL
)
?
genericRspCallback
:
handleRequestRspFp
[
TMSG_INDEX
(
pRequest
->
type
)];
pMsgSendInfo
->
fp
=
(
handleRequestRspFp
[
TMSG_INDEX
(
pRequest
->
type
)]
==
NULL
)
?
genericRspCallback
:
handleRequestRspFp
[
TMSG_INDEX
(
pRequest
->
type
)];
return
pMsgSendInfo
;
}
...
...
@@ -133,7 +135,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SShowRsp
showRsp
=
{
0
};
tDeserializeSShowRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
showRsp
);
STableMetaRsp
*
pMetaMsg
=
&
showRsp
.
tableMeta
;
STableMetaRsp
*
pMetaMsg
=
&
showRsp
.
tableMeta
;
taosMemoryFreeClear
(
pRequest
->
body
.
resInfo
.
pRspMsg
);
pRequest
->
body
.
resInfo
.
pRspMsg
=
pMsg
->
pData
;
...
...
@@ -159,7 +161,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
pRequest
->
type
==
TDMT_VND_SHOW_TABLES
)
{
SShowReqInfo
*
pShowInfo
=
&
pRequest
->
body
.
showInfo
;
int32_t
index
=
pShowInfo
->
currentIndex
;
int32_t
index
=
pShowInfo
->
currentIndex
;
SVgroupInfo
*
pInfo
=
taosArrayGet
(
pShowInfo
->
pArray
,
index
);
pShowInfo
->
vgId
=
pInfo
->
vgId
;
}
...
...
@@ -169,8 +171,8 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
int32_t
processRetrieveMnodeRsp
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
param
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
SRequestObj
*
pRequest
=
param
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
taosMemoryFreeClear
(
pResInfo
->
pRspMsg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -181,19 +183,19 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
assert
(
pMsg
->
len
>=
sizeof
(
SRetrieveTableRsp
));
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
pMsg
->
pData
;
pRetrieve
->
numOfRows
=
htonl
(
pRetrieve
->
numOfRows
);
pRetrieve
->
precision
=
htons
(
pRetrieve
->
precision
);
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
pMsg
->
pData
;
pRetrieve
->
numOfRows
=
htonl
(
pRetrieve
->
numOfRows
);
pRetrieve
->
precision
=
htons
(
pRetrieve
->
precision
);
pResInfo
->
pRspMsg
=
pMsg
->
pData
;
pResInfo
->
pRspMsg
=
pMsg
->
pData
;
pResInfo
->
numOfRows
=
pRetrieve
->
numOfRows
;
pResInfo
->
pData
=
pRetrieve
->
data
;
pResInfo
->
pData
=
pRetrieve
->
data
;
pResInfo
->
completed
=
pRetrieve
->
completed
;
pResInfo
->
current
=
0
;
// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
tscDebug
(
"0x%"
PRIx64
" numOfRows:%d, complete:%d, qId:0x%"
PRIx64
,
pRequest
->
self
,
pRetrieve
->
numOfRows
,
tscDebug
(
"0x%"
PRIx64
" numOfRows:%d, complete:%d, qId:0x%"
PRIx64
,
pRequest
->
self
,
pRetrieve
->
numOfRows
,
pRetrieve
->
completed
,
pRequest
->
body
.
showInfo
.
execId
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
...
...
@@ -214,20 +216,20 @@ int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert
(
pMsg
->
len
>=
sizeof
(
SRetrieveTableRsp
));
pResInfo
->
pRspMsg
=
pMsg
->
pData
;
pResInfo
->
pRspMsg
=
pMsg
->
pData
;
SVShowTablesFetchRsp
*
pFetchRsp
=
(
SVShowTablesFetchRsp
*
)
pMsg
->
pData
;
pFetchRsp
->
numOfRows
=
htonl
(
pFetchRsp
->
numOfRows
);
pFetchRsp
->
precision
=
htons
(
pFetchRsp
->
precision
);
SVShowTablesFetchRsp
*
pFetchRsp
=
(
SVShowTablesFetchRsp
*
)
pMsg
->
pData
;
pFetchRsp
->
numOfRows
=
htonl
(
pFetchRsp
->
numOfRows
);
pFetchRsp
->
precision
=
htons
(
pFetchRsp
->
precision
);
pResInfo
->
pRspMsg
=
pMsg
->
pData
;
pResInfo
->
pRspMsg
=
pMsg
->
pData
;
pResInfo
->
numOfRows
=
pFetchRsp
->
numOfRows
;
pResInfo
->
pData
=
pFetchRsp
->
data
;
pResInfo
->
pData
=
pFetchRsp
->
data
;
pResInfo
->
current
=
0
;
// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
tscDebug
(
"0x%"
PRIx64
" numOfRows:%d, complete:%d, qId:0x%"
PRIx64
,
pRequest
->
self
,
pFetchRsp
->
numOfRows
,
tscDebug
(
"0x%"
PRIx64
" numOfRows:%d, complete:%d, qId:0x%"
PRIx64
,
pRequest
->
self
,
pFetchRsp
->
numOfRows
,
pFetchRsp
->
completed
,
pRequest
->
body
.
showInfo
.
execId
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
...
...
@@ -251,18 +253,19 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
TSDB_CODE_MND_DB_NOT_EXIST
==
code
)
{
SUseDbRsp
usedbRsp
=
{
0
};
tDeserializeSUseDbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
usedbRsp
);
struct
SCatalog
*
pCatalog
=
NULL
;
struct
SCatalog
*
pCatalog
=
NULL
;
if
(
usedbRsp
.
vgVersion
>=
0
)
{
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
tstrerror
(
code
));
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
tstrerror
(
code
));
}
else
{
catalogRemoveDB
(
pCatalog
,
usedbRsp
.
db
,
usedbRsp
.
uid
);
}
}
tFreeSUsedbRsp
(
&
usedbRsp
);
tFreeSUsedbRsp
(
&
usedbRsp
);
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -276,7 +279,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tDeserializeSUseDbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
usedbRsp
);
SName
name
=
{
0
};
tNameFromString
(
&
name
,
usedbRsp
.
db
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameFromString
(
&
name
,
usedbRsp
.
db
,
T_NAME_ACCT
|
T_NAME_DB
);
SUseDbOutput
output
=
{
0
};
code
=
queryBuildUseDbOutput
(
&
output
,
&
usedbRsp
);
...
...
@@ -288,11 +291,12 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tscError
(
"failed to build use db output since %s"
,
terrstr
());
}
else
{
struct
SCatalog
*
pCatalog
=
NULL
;
struct
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
tstrerror
(
code
));
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
tstrerror
(
code
));
}
else
{
catalogUpdateDBVgInfo
(
pCatalog
,
output
.
db
,
output
.
dbId
,
output
.
dbVgroup
);
}
...
...
@@ -419,14 +423,14 @@ void initMsgHandleFp() {
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
#endif
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_CONNECT
)]
=
processConnectRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_SHOW
)]
=
processShowRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_CONNECT
)]
=
processConnectRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_SHOW
)]
=
processShowRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_SHOW_RETRIEVE
)]
=
processRetrieveMnodeRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DB
)]
=
processCreateDbRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
processUseDbRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_STB
)]
=
processCreateTableRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_DROP_DB
)]
=
processDropDbRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DB
)]
=
processCreateDbRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
processUseDbRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_STB
)]
=
processCreateTableRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_MND_DROP_DB
)]
=
processDropDbRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES
)]
=
processShowRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES_FETCH
)]
=
processRetrieveVndRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES
)]
=
processShowRsp
;
handleRequestRspFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES_FETCH
)]
=
processRetrieveVndRsp
;
}
source/client/src/tmq.c
浏览文件 @
7adb395b
...
...
@@ -357,7 +357,15 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if
(
pTmq
==
NULL
)
{
return
NULL
;
}
pTmq
->
pTscObj
=
taos_connect
(
conf
->
ip
,
conf
->
user
,
conf
->
pass
,
conf
->
db
,
conf
->
port
);
const
char
*
user
=
conf
->
user
==
NULL
?
TSDB_DEFAULT_USER
:
conf
->
user
;
const
char
*
pass
=
conf
->
pass
==
NULL
?
TSDB_DEFAULT_PASS
:
conf
->
pass
;
ASSERT
(
user
);
ASSERT
(
pass
);
ASSERT
(
conf
->
db
);
pTmq
->
pTscObj
=
taos_connect_internal
(
conf
->
ip
,
user
,
pass
,
NULL
,
conf
->
db
,
conf
->
port
,
CONN_TYPE__TMQ
);
if
(
pTmq
->
pTscObj
==
NULL
)
return
NULL
;
pTmq
->
inWaiting
=
0
;
pTmq
->
status
=
0
;
...
...
source/common/src/tmsg.c
浏览文件 @
7adb395b
...
...
@@ -2532,6 +2532,7 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
connType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
pid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
app
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
db
)
<
0
)
return
-
1
;
...
...
@@ -2548,6 +2549,7 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
connType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
pid
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
app
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
db
)
<
0
)
return
-
1
;
...
...
@@ -2567,6 +2569,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if
(
tEncodeI64
(
&
encoder
,
pRsp
->
clusterId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
connId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
superUser
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
connType
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
&
encoder
,
&
pRsp
->
epSet
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
sVersion
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
...
...
@@ -2585,6 +2588,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if
(
tDecodeI64
(
&
decoder
,
&
pRsp
->
clusterId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
connId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
superUser
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
connType
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
&
decoder
,
&
pRsp
->
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
sVersion
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
7adb395b
...
...
@@ -23,13 +23,14 @@
#include "tglobal.h"
#include "version.h"
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define QUERY_ID_SIZE
20
#define QUERY_OBJ_ID_SIZE
18
#define SUBQUERY_INFO_SIZE 6
#define QUERY_SAVE_SIZE 20
#define QUERY_SAVE_SIZE
20
typedef
struct
{
int32_t
id
;
int8_t
connType
;
char
user
[
TSDB_USER_LEN
];
char
app
[
TSDB_APP_NAME_LEN
];
// app name that invokes taosc
int64_t
appStartTimeMs
;
// app start time
...
...
@@ -44,8 +45,8 @@ typedef struct {
SQueryDesc
*
pQueries
;
}
SConnObj
;
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
,
int64_t
startTime
);
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
int8_t
connType
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
,
int64_t
startTime
);
static
void
mndFreeConn
(
SConnObj
*
pConn
);
static
SConnObj
*
mndAcquireConn
(
SMnode
*
pMnode
,
int32_t
connId
);
static
void
mndReleaseConn
(
SMnode
*
pMnode
,
SConnObj
*
pConn
);
...
...
@@ -93,8 +94,8 @@ void mndCleanupProfile(SMnode *pMnode) {
}
}
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
,
int64_t
startTime
)
{
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
int8_t
connType
,
uint32_t
ip
,
uint16_t
port
,
int32_t
pid
,
const
char
*
app
,
int64_t
startTime
)
{
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
int32_t
connId
=
atomic_add_fetch_32
(
&
pMgmt
->
connId
,
1
);
...
...
@@ -102,6 +103,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui
if
(
startTime
==
0
)
startTime
=
taosGetTimestampMs
();
SConnObj
connObj
=
{.
id
=
connId
,
.
connType
=
connType
,
.
appStartTimeMs
=
startTime
,
.
pid
=
pid
,
.
ip
=
ip
,
...
...
@@ -159,8 +161,8 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
}
void
*
mndGetNextConn
(
SMnode
*
pMnode
,
SCacheIter
*
pIter
)
{
SConnObj
*
pConn
=
NULL
;
bool
hasNext
=
taosCacheIterNext
(
pIter
);
SConnObj
*
pConn
=
NULL
;
bool
hasNext
=
taosCacheIterNext
(
pIter
);
if
(
hasNext
)
{
size_t
dataLen
=
0
;
pConn
=
taosCacheIterGetData
(
pIter
,
&
dataLen
);
...
...
@@ -210,8 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
}
}
pConn
=
mndCreateConn
(
pMnode
,
pReq
->
user
,
pReq
->
clientIp
,
pReq
->
clientPort
,
connReq
.
pid
,
connReq
.
app
,
connReq
.
startTime
);
pConn
=
mndCreateConn
(
pMnode
,
pReq
->
user
,
connReq
.
connType
,
pReq
->
clientIp
,
pReq
->
clientPort
,
connReq
.
pid
,
connReq
.
app
,
connReq
.
startTime
);
if
(
pConn
==
NULL
)
{
mError
(
"user:%s, failed to login from %s while create connection since %s"
,
pReq
->
user
,
ip
,
terrstr
());
goto
CONN_OVER
;
...
...
@@ -222,6 +224,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
connectRsp
.
superUser
=
pUser
->
superUser
;
connectRsp
.
clusterId
=
pMnode
->
clusterId
;
connectRsp
.
connId
=
pConn
->
id
;
connectRsp
.
connType
=
connReq
.
connType
;
snprintf
(
connectRsp
.
sVersion
,
sizeof
(
connectRsp
.
sVersion
),
"ver:%s
\n
build:%s
\n
gitinfo:%s"
,
version
,
buildinfo
,
gitinfo
);
...
...
@@ -343,7 +346,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
return
-
1
;
}
SClientHbBatchRsp
batchRsp
=
{
0
};
batchRsp
.
rsps
=
taosArrayInit
(
0
,
sizeof
(
SClientHbRsp
));
...
...
@@ -917,4 +919,4 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
int32_t
mndGetNumOfConnections
(
SMnode
*
pMnode
)
{
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
return
taosCacheGetNumOfObj
(
pMgmt
->
cache
);
}
\ No newline at end of file
}
tests/test/c/tmqDemo.c
浏览文件 @
7adb395b
此差异已折叠。
点击以展开。
tests/test/c/tmqSim.c
浏览文件 @
7adb395b
...
...
@@ -35,8 +35,8 @@
#define MAX_ROW_STR_LEN (16 * 1024)
typedef
struct
{
int32_t
expectMsgCnt
;
int32_t
consumeMsgCnt
;
int32_t
expectMsgCnt
;
int32_t
consumeMsgCnt
;
TdThread
thread
;
}
SThreadInfo
;
...
...
@@ -45,12 +45,12 @@ typedef struct {
char
dbName
[
32
];
char
topicString
[
256
];
char
keyString
[
1024
];
char
topicString1
[
256
];
char
keyString1
[
1024
];
char
topicString1
[
256
];
char
keyString1
[
1024
];
int32_t
showMsgFlag
;
int32_t
consumeDelay
;
// unit s
int32_t
consumeMsgCnt
;
int32_t
checkMode
;
int32_t
checkMode
;
// save result after parse agrvs
int32_t
numOfTopic
;
...
...
@@ -59,13 +59,13 @@ typedef struct {
int32_t
numOfKey
;
char
key
[
32
][
64
];
char
value
[
32
][
64
];
int32_t
numOfTopic1
;
char
topics1
[
32
][
64
];
int32_t
numOfKey1
;
char
key1
[
32
][
64
];
char
value1
[
32
][
64
];
int32_t
numOfTopic1
;
char
topics1
[
32
][
64
];
int32_t
numOfKey1
;
char
key1
[
32
][
64
];
char
value1
[
32
][
64
];
}
SConfInfo
;
static
SConfInfo
g_stConfInfo
;
...
...
@@ -186,18 +186,18 @@ void parseInputString() {
ltrim
(
g_stConfInfo
.
topics
[
g_stConfInfo
.
numOfTopic
]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
numOfTopic
++
;
token
=
strtok
(
NULL
,
delim
);
}
}
token
=
strtok
(
g_stConfInfo
.
topicString1
,
delim
);
while
(
token
!=
NULL
)
{
//printf("%s\n", token );
strcpy
(
g_stConfInfo
.
topics1
[
g_stConfInfo
.
numOfTopic1
],
token
);
while
(
token
!=
NULL
)
{
//
printf("%s\n", token );
strcpy
(
g_stConfInfo
.
topics1
[
g_stConfInfo
.
numOfTopic1
],
token
);
ltrim
(
g_stConfInfo
.
topics1
[
g_stConfInfo
.
numOfTopic1
]);
//
printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
numOfTopic1
++
;
//
printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
numOfTopic1
++
;
token
=
strtok
(
NULL
,
delim
);
}
...
...
@@ -214,28 +214,28 @@ void parseInputString() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
numOfKey
++
;
}
token
=
strtok
(
NULL
,
delim
);
}
token
=
strtok
(
g_stConfInfo
.
keyString1
,
delim
);
while
(
token
!=
NULL
)
{
//printf("%s\n", token );
{
char
*
pstr
=
token
;
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
key1
[
g_stConfInfo
.
numOfKey1
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
value1
[
g_stConfInfo
.
numOfKey1
],
ret
+
1
);
//printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
numOfKey1
++
;
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
{
char
*
pstr
=
token
;
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
key1
[
g_stConfInfo
.
numOfKey1
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
value1
[
g_stConfInfo
.
numOfKey1
],
ret
+
1
);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
numOfKey1
++
;
}
token
=
strtok
(
NULL
,
delim
);
}
}
static
int
running
=
1
;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
...
...
@@ -253,6 +253,7 @@ int queryDB(TAOS* taos, char* command) {
}
tmq_t
*
build_consumer
()
{
#if 0
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
...
...
@@ -266,13 +267,19 @@ tmq_t* build_consumer() {
exit(-1);
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
// tmq_conf_set(conf, "group.id", "tg2");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfKey
;
i
++
)
{
tmq_conf_set
(
conf
,
g_stConfInfo
.
key
[
i
],
g_stConfInfo
.
value
[
i
]);
}
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
...
...
@@ -285,10 +292,10 @@ tmq_list_t* build_topic_list() {
return
topic_list
;
}
tmq_t
*
build_consumer_x
()
{
#if 0
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
...
...
@@ -296,23 +303,29 @@ tmq_t* build_consumer_x() {
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
taos_free_result
(
pRes
);
exit
(
-
1
);
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
//tmq_conf_set(conf, "group.id", "tg2");
//
tmq_conf_set(conf, "group.id", "tg2");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfKey1
;
i
++
)
{
tmq_conf_set
(
conf
,
g_stConfInfo
.
key1
[
i
],
g_stConfInfo
.
value1
[
i
]);
}
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list_x
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
//tmq_list_append(topic_list, "test_stb_topic_1");
//
tmq_list_append(topic_list, "test_stb_topic_1");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfTopic1
;
i
++
)
{
tmq_list_append
(
topic_list
,
g_stConfInfo
.
topics1
[
i
]);
}
...
...
@@ -367,9 +380,9 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
if
(
tmqMsg
)
{
totalMsgs
++
;
//
printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
//
printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
#if 0
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
...
...
@@ -396,65 +409,63 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
exit
(
-
1
);
}
//printf("%d", totalMsgs); // output to sim for check result
//
printf("%d", totalMsgs); // output to sim for check result
return
totalMsgs
;
}
void
*
threadFunc
(
void
*
param
)
{
void
*
threadFunc
(
void
*
param
)
{
int32_t
totalMsgs
=
0
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
tmq_t
*
tmq
=
build_consumer_x
();
tmq_t
*
tmq
=
build_consumer_x
();
tmq_list_t
*
topic_list
=
build_topic_list_x
();
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
)){
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
))
{
return
NULL
;
}
tmq_resp_err_t
err
=
tmq_subscribe
(
tmq
,
topic_list
);
if
(
err
)
{
printf
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
//if (0 == g_stConfInfo.consumeMsgCnt) {
// loop_consume(tmq);
//} else {
pInfo
->
consumeMsgCnt
=
parallel_consume
(
tmq
,
1
);
//
if (0 == g_stConfInfo.consumeMsgCnt) {
//
loop_consume(tmq);
//
} else {
pInfo
->
consumeMsgCnt
=
parallel_consume
(
tmq
,
1
);
//}
err
=
tmq_unsubscribe
(
tmq
);
if
(
err
)
{
printf
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
pInfo
->
consumeMsgCnt
=
-
1
;
pInfo
->
consumeMsgCnt
=
-
1
;
return
NULL
;
}
return
NULL
;
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
parseArgument
(
argc
,
argv
);
parseInputString
();
int32_t
numOfThreads
=
1
;
int32_t
numOfThreads
=
1
;
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
taosThreadAttrSetDetachState
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
taosMemoryCalloc
(
numOfThreads
,
sizeof
(
SThreadInfo
));
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
taosMemoryCalloc
(
numOfThreads
,
sizeof
(
SThreadInfo
));
if
(
g_stConfInfo
.
numOfTopic1
)
{
// pthread_create one thread to consume
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pInfo
[
i
].
expectMsgCnt
=
0
;
pInfo
[
i
].
consumeMsgCnt
=
0
;
taosThreadCreate
(
&
(
pInfo
[
i
].
thread
),
&
thattr
,
threadFunc
,
(
void
*
)(
pInfo
+
i
));
taosThreadCreate
(
&
(
pInfo
[
i
].
thread
),
&
thattr
,
threadFunc
,
(
void
*
)(
pInfo
+
i
));
}
}
int32_t
totalMsgs
=
0
;
int32_t
totalMsgs
=
0
;
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
))
{
...
...
@@ -479,48 +490,48 @@ int main(int32_t argc, char* argv[]) {
exit
(
-
1
);
}
if
(
g_stConfInfo
.
numOfTopic1
)
{
if
(
g_stConfInfo
.
numOfTopic1
)
{
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
i
++
)
{
taosThreadJoin
(
pInfo
[
i
].
thread
,
NULL
);
}
//
printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
if
(
0
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
//
printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
if
(
0
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
1
==
g_stConfInfo
.
checkMode
)
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
1
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
))
{
printf
(
"success"
);
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
2
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
3
*
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
2
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
3
*
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
3
==
g_stConfInfo
.
checkMode
)
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
3
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
{
printf
(
"success"
);
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
4
==
g_stConfInfo
.
checkMode
)
{
if
(((
totalMsgs
==
0
)
&&
(
pInfo
->
consumeMsgCnt
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
0
)
&&
(
totalMsgs
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)))
{
printf
(
"success"
);
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
4
==
g_stConfInfo
.
checkMode
)
{
if
(((
totalMsgs
==
0
)
&&
(
pInfo
->
consumeMsgCnt
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
0
)
&&
(
totalMsgs
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)))
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
{
printf
(
"fail, check mode unknow. consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
{
printf
(
"fail, check mode unknow. consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录