Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d17dd905
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d17dd905
编写于
1月 28, 2022
作者:
L
Liu Jicong
提交者:
GitHub
1月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10084 from taosdata/feature/tq
refactor: move tmq out of clientImpl
上级
f06a6c9a
5bb010ef
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
734 addition
and
706 deletion
+734
-706
include/client/taos.h
include/client/taos.h
+1
-1
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+15
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+0
-676
source/client/src/tmq.c
source/client/src/tmq.c
+691
-0
source/client/test/CMakeLists.txt
source/client/test/CMakeLists.txt
+1
-1
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+3
-3
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+23
-25
未找到文件。
include/client/taos.h
浏览文件 @
d17dd905
...
...
@@ -220,7 +220,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* c
DLL_EXPORT
TAOS_RES
*
taos_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
DLL_EXPORT
tmq_t
*
t
aos
_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
tmq_t
*
t
mq
_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
TAOS_RES
*
tmq_subscribe
(
tmq_t
*
tmq
,
tmq_list_t
*
topic_list
);
...
...
source/client/inc/clientInt.h
浏览文件 @
d17dd905
...
...
@@ -30,6 +30,16 @@ extern "C" {
#include "tmsgtype.h"
#include "trpc.h"
#include "query.h"
#include "parser.h"
#define CHECK_CODE_GOTO(expr, label) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
terrno = code; \
goto label; \
} \
} while (0)
#define HEARTBEAT_INTERVAL 1500 // ms
...
...
@@ -219,6 +229,11 @@ void *doFetchRow(SRequestObj* pRequest);
void
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
);
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
SQueryNode
**
pQuery
);
// --- heartbeat
// global, called by mgmt
int
hbMgrInit
();
...
...
source/client/src/clientImpl.c
浏览文件 @
d17dd905
...
...
@@ -12,102 +12,6 @@
#include "tpagedfile.h"
#include "tref.h"
#define CHECK_CODE_GOTO(expr, label) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
terrno = code; \
goto label; \
} \
} while (0)
typedef
struct
SMqClientVg
{
// statistics
int64_t
pollCnt
;
// offset
int64_t
committedOffset
;
int64_t
currentOffset
;
//connection info
int32_t
vgId
;
SEpSet
epSet
;
}
SMqClientVg
;
typedef
struct
SMqClientTopic
{
// subscribe info
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
int32_t
nextVgIdx
;
SArray
*
vgs
;
//SArray<SMqClientVg>
}
SMqClientTopic
;
struct
tmq_resp_err_t
{
int32_t
code
;
};
struct
tmq_topic_vgroup_t
{
char
*
topic
;
int32_t
vgId
;
int64_t
commitOffset
;
};
struct
tmq_topic_vgroup_list_t
{
int32_t
cnt
;
int32_t
size
;
tmq_topic_vgroup_t
*
elems
;
};
typedef
struct
SMqConsumeCbParam
{
tmq_t
*
tmq
;
SMqClientVg
*
pVg
;
tmq_message_t
**
retMsg
;
}
SMqConsumeCbParam
;
struct
tmq_conf_t
{
char
clientId
[
256
];
char
groupId
[
256
];
char
*
ip
;
uint16_t
port
;
tmq_commit_cb
*
commit_cb
;
};
struct
tmq_message_t
{
SMqConsumeRsp
rsp
;
};
tmq_conf_t
*
tmq_conf_new
()
{
tmq_conf_t
*
conf
=
calloc
(
1
,
sizeof
(
tmq_conf_t
));
return
conf
;
}
int32_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
)
{
if
(
strcmp
(
key
,
"group.id"
)
==
0
)
{
strcpy
(
conf
->
groupId
,
value
);
}
if
(
strcmp
(
key
,
"client.id"
)
==
0
)
{
strcpy
(
conf
->
clientId
,
value
);
}
return
0
;
}
struct
tmq_t
{
char
groupId
[
256
];
char
clientId
[
256
];
SRWLatch
lock
;
int64_t
consumerId
;
int64_t
epoch
;
int64_t
status
;
tsem_t
rspSem
;
STscObj
*
pTscObj
;
tmq_commit_cb
*
commit_cb
;
int32_t
nextTopicIdx
;
SArray
*
clientTopics
;
//SArray<SMqClientTopic>
//stat
int64_t
pollCnt
;
};
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
...
...
@@ -345,586 +249,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
return
schedulerAsyncExecJob
(
pTransporter
,
pNodeList
,
pDag
,
pRequest
->
sqlstr
,
&
pRequest
->
body
.
pQueryJob
);
}
tmq_t
*
taos_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
)
{
tmq_t
*
pTmq
=
calloc
(
sizeof
(
tmq_t
),
1
);
if
(
pTmq
==
NULL
)
{
return
NULL
;
}
pTmq
->
pTscObj
=
(
STscObj
*
)
conn
;
pTmq
->
status
=
0
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
taosInitRWLatch
(
&
pTmq
->
lock
);
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
commit_cb
=
conf
->
commit_cb
;
tsem_init
(
&
pTmq
->
rspSem
,
0
,
0
);
pTmq
->
consumerId
=
generateRequestId
()
&
((
uint64_t
)
-
1
>>
1
);
pTmq
->
clientTopics
=
taosArrayInit
(
0
,
sizeof
(
SMqClientTopic
));
return
pTmq
;
}
struct
tmq_list_t
{
int32_t
cnt
;
int32_t
tot
;
char
*
elems
[];
};
tmq_list_t
*
tmq_list_new
()
{
tmq_list_t
*
ptr
=
malloc
(
sizeof
(
tmq_list_t
)
+
8
*
sizeof
(
char
*
));
if
(
ptr
==
NULL
)
{
return
ptr
;
}
ptr
->
cnt
=
0
;
ptr
->
tot
=
8
;
return
ptr
;
}
int32_t
tmq_list_append
(
tmq_list_t
*
ptr
,
char
*
src
)
{
if
(
ptr
->
cnt
>=
ptr
->
tot
-
1
)
return
-
1
;
ptr
->
elems
[
ptr
->
cnt
]
=
strdup
(
src
);
ptr
->
cnt
++
;
return
0
;
}
int32_t
tmq_null_cb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
if
(
code
==
0
)
{
//
}
//
return
0
;
}
TAOS_RES
*
tmq_subscribe
(
tmq_t
*
tmq
,
tmq_list_t
*
topic_list
)
{
SRequestObj
*
pRequest
=
NULL
;
int32_t
sz
=
topic_list
->
cnt
;
//destroy ex
taosArrayDestroy
(
tmq
->
clientTopics
);
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
SCMSubscribeReq
req
;
req
.
topicNum
=
sz
;
req
.
consumerId
=
tmq
->
consumerId
;
req
.
consumerGroup
=
strdup
(
tmq
->
groupId
);
req
.
topicNames
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topicName
=
topic_list
->
elems
[
i
];
SName
name
=
{
0
};
char
*
dbName
=
getDbOfConnection
(
tmq
->
pTscObj
);
tNameSetDbName
(
&
name
,
tmq
->
pTscObj
->
acctId
,
dbName
,
strlen
(
dbName
));
tNameFromString
(
&
name
,
topicName
,
T_NAME_TABLE
);
char
*
topicFname
=
calloc
(
1
,
TSDB_TOPIC_FNAME_LEN
);
if
(
topicFname
==
NULL
)
{
}
tNameExtractFullName
(
&
name
,
topicFname
);
tscDebug
(
"subscribe topic: %s"
,
topicFname
);
SMqClientTopic
topic
=
{
.
nextVgIdx
=
0
,
.
sql
=
NULL
,
.
sqlLen
=
0
,
.
topicId
=
0
,
.
topicName
=
topicFname
,
.
vgs
=
NULL
};
topic
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
SMqClientVg
));
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
/*SMqClientTopic topic = {*/
/*.*/
/*};*/
taosArrayPush
(
req
.
topicNames
,
&
topicFname
);
}
int
tlen
=
tSerializeSCMSubscribeReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
goto
_return
;
}
void
*
abuf
=
buf
;
tSerializeSCMSubscribeReq
(
&
abuf
,
&
req
);
/*printf("formatted: %s\n", dagStr);*/
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_MND_SUBSCRIBE
);
if
(
pRequest
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
}
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
/*sendInfo->fp*/
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
_return:
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if
(
pRequest
!=
NULL
&&
terrno
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
terrno
;
}
return
pRequest
;
}
void
tmq_conf_set_offset_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
)
{
conf
->
commit_cb
=
cb
;
}
SArray
*
tmqGetConnInfo
(
SClientHbKey
connKey
,
void
*
param
)
{
tmq_t
*
pTmq
=
(
void
*
)
param
;
SArray
*
pArray
=
taosArrayInit
(
0
,
sizeof
(
SKv
));
if
(
pArray
==
NULL
)
{
return
NULL
;
}
SKv
kv
=
{
0
};
kv
.
key
=
HEARTBEAT_KEY_MQ_TMP
;
SMqHbMsg
*
pMqHb
=
malloc
(
sizeof
(
SMqHbMsg
));
if
(
pMqHb
==
NULL
)
{
return
pArray
;
}
pMqHb
->
consumerId
=
connKey
.
connId
;
SArray
*
clientTopics
=
pTmq
->
clientTopics
;
int
sz
=
taosArrayGetSize
(
clientTopics
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
*
pCTopic
=
taosArrayGet
(
clientTopics
,
i
);
/*if (pCTopic->vgId == -1) {*/
/*pMqHb->status = 1;*/
/*break;*/
/*}*/
}
kv
.
value
=
pMqHb
;
kv
.
valueLen
=
sizeof
(
SMqHbMsg
);
taosArrayPush
(
pArray
,
&
kv
);
return
pArray
;
}
tmq_t
*
tmqCreateConsumerImpl
(
TAOS
*
conn
,
tmq_conf_t
*
conf
)
{
tmq_t
*
pTmq
=
malloc
(
sizeof
(
tmq_t
));
if
(
pTmq
==
NULL
)
{
return
NULL
;
}
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
pTmq
->
pTscObj
=
(
STscObj
*
)
conn
;
pTmq
->
pTscObj
->
connType
=
HEARTBEAT_TYPE_MQ
;
return
pTmq
;
}
TAOS_RES
*
taos_create_topic
(
TAOS
*
taos
,
const
char
*
topicName
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQueryNode
=
NULL
;
char
*
pStr
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
if
(
taos
==
NULL
||
topicName
==
NULL
||
sql
==
NULL
)
{
tscError
(
"invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s"
,
taos
,
topicName
,
sql
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
if
(
strlen
(
topicName
)
>=
TSDB_TOPIC_NAME_LEN
)
{
tscError
(
"topic name too long, max length:%d"
,
TSDB_TOPIC_NAME_LEN
-
1
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
if
(
sqlLen
>
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
goto
_return
;
}
tscDebug
(
"start to create topic, %s"
,
topicName
);
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
&
pQueryNode
),
_return
);
SQueryStmtInfo
*
pQueryStmtInfo
=
(
SQueryStmtInfo
*
)
pQueryNode
;
pQueryStmtInfo
->
info
.
continueQuery
=
true
;
// todo check for invalid sql statement and return with error code
SSchema
*
schema
=
NULL
;
int32_t
numOfCols
=
0
;
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQueryNode
,
&
pRequest
->
body
.
pDag
,
&
schema
,
&
numOfCols
,
NULL
,
pRequest
->
requestId
),
_return
);
pStr
=
qDagToString
(
pRequest
->
body
.
pDag
);
if
(
pStr
==
NULL
)
{
goto
_return
;
}
printf
(
"%s
\n
"
,
pStr
);
// The topic should be related to a database that the queried table is belonged to.
SName
name
=
{
0
};
char
dbName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
((
SQueryStmtInfo
*
)
pQueryNode
)
->
pTableMetaInfo
[
0
]
->
name
,
dbName
);
tNameFromString
(
&
name
,
dbName
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameFromString
(
&
name
,
topicName
,
T_NAME_TABLE
);
char
topicFname
[
TSDB_TOPIC_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
name
,
topicFname
);
SCMCreateTopicReq
req
=
{
.
name
=
(
char
*
)
topicFname
,
.
igExists
=
1
,
.
physicalPlan
=
(
char
*
)
pStr
,
.
sql
=
(
char
*
)
sql
,
.
logicalPlan
=
"no logic plan"
,
};
int
tlen
=
tSerializeSCMCreateTopicReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
goto
_return
;
}
void
*
abuf
=
buf
;
tSerializeSCMCreateTopicReq
(
&
abuf
,
&
req
);
/*printf("formatted: %s\n", dagStr);*/
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
pRequest
->
type
=
TDMT_MND_CREATE_TOPIC
;
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
SEpSet
epSet
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
_return:
qDestroyQuery
(
pQueryNode
);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if
(
pRequest
!=
NULL
&&
terrno
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
terrno
;
}
return
pRequest
;
}
static
char
*
formatTimestamp
(
char
*
buf
,
int64_t
val
,
int
precision
)
{
time_t
tt
;
int32_t
ms
=
0
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
tt
=
(
time_t
)(
val
/
1000000000
);
ms
=
val
%
1000000000
;
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
tt
=
(
time_t
)(
val
/
1000000
);
ms
=
val
%
1000000
;
}
else
{
tt
=
(
time_t
)(
val
/
1000
);
ms
=
val
%
1000
;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if
(
tt
<
0
)
tt
=
0
;
#endif
if
(
tt
<=
0
&&
ms
<
0
)
{
tt
--
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
ms
+=
1000000000
;
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
ms
+=
1000000
;
}
else
{
ms
+=
1000
;
}
}
struct
tm
*
ptm
=
localtime
(
&
tt
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
sprintf
(
buf
+
pos
,
".%09d"
,
ms
);
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
sprintf
(
buf
+
pos
,
".%06d"
,
ms
);
}
else
{
sprintf
(
buf
+
pos
,
".%03d"
,
ms
);
}
return
buf
;
}
int32_t
tmq_poll_cb_inner
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
if
(
code
==
-
1
)
{
printf
(
"msg discard
\n
"
);
return
0
;
}
char
pBuf
[
128
];
SMqConsumeCbParam
*
pParam
=
(
SMqConsumeCbParam
*
)
param
;
SMqClientVg
*
pVg
=
pParam
->
pVg
;
SMqConsumeRsp
rsp
;
tDecodeSMqConsumeRsp
(
pMsg
->
pData
,
&
rsp
);
if
(
rsp
.
numOfTopics
==
0
)
{
/*printf("no data\n");*/
return
0
;
}
int32_t
colNum
=
rsp
.
schemas
->
nCols
;
pVg
->
currentOffset
=
rsp
.
rspOffset
;
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/
/*printf("-----msg begin----\n");*/
printf
(
"|"
);
for
(
int32_t
i
=
0
;
i
<
colNum
;
i
++
)
{
if
(
i
==
0
)
printf
(
" %25s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
else
printf
(
" %15s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
}
printf
(
"
\n
"
);
printf
(
"===============================================
\n
"
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
pBlockData
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
rsp
.
pBlockData
,
i
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
printf
(
"|"
);
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
formatTimestamp
(
pBuf
,
*
(
uint64_t
*
)
var
,
TSDB_TIME_PRECISION_MILLI
);
printf
(
" %25s |"
,
pBuf
);
break
;
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
printf
(
" %15u |"
,
*
(
uint32_t
*
)
var
);
break
;
}
}
printf
(
"
\n
"
);
}
}
/*printf("\n-----msg end------\n");*/
return
0
;
}
typedef
struct
SMqAskEpCbParam
{
tmq_t
*
tmq
;
int32_t
wait
;
}
SMqAskEpCbParam
;
int32_t
tmq_ask_ep_cb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqAskEpCbParam
*
pParam
=
(
SMqAskEpCbParam
*
)
param
;
tmq_t
*
tmq
=
pParam
->
tmq
;
if
(
code
!=
0
)
{
printf
(
"get topic endpoint error, not ready, wait:%d
\n
"
,
pParam
->
wait
);
if
(
pParam
->
wait
)
{
tsem_post
(
&
tmq
->
rspSem
);
}
return
0
;
}
tscDebug
(
"tmq ask ep cb called"
);
bool
set
=
false
;
SMqCMGetSubEpRsp
rsp
;
tDecodeSMqCMGetSubEpRsp
(
pMsg
->
pData
,
&
rsp
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
topics
);
// TODO: lock
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
if
(
rsp
.
epoch
!=
tmq
->
epoch
)
{
//TODO
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
rsp
.
topics
,
i
);
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
int32_t
vgSz
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
topic
.
vgs
=
taosArrayInit
(
vgSz
,
sizeof
(
SMqClientVg
));
for
(
int32_t
j
=
0
;
j
<
vgSz
;
j
++
)
{
SMqSubVgEp
*
pVgEp
=
taosArrayGet
(
pTopicEp
->
vgs
,
j
);
SMqClientVg
clientVg
=
{
.
pollCnt
=
0
,
.
committedOffset
=
-
1
,
.
currentOffset
=
-
1
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
};
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
}
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
}
tmq
->
epoch
=
rsp
.
epoch
;
}
if
(
set
)
{
atomic_store_64
(
&
tmq
->
status
,
1
);
}
// unlock
/*tsem_post(&tmq->rspSem);*/
if
(
pParam
->
wait
)
{
tsem_post
(
&
tmq
->
rspSem
);
}
free
(
pParam
);
return
0
;
}
int32_t
tmqAsyncAskEp
(
tmq_t
*
tmq
,
bool
wait
)
{
int32_t
tlen
=
sizeof
(
SMqCMGetSubEpReq
);
SMqCMGetSubEpReq
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
tscError
(
"failed to malloc get subscribe ep buf"
);
}
buf
->
consumerId
=
htobe64
(
tmq
->
consumerId
);
strcpy
(
buf
->
cgroup
,
tmq
->
groupId
);
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_MND_GET_SUB_EP
);
if
(
pRequest
==
NULL
)
{
tscError
(
"failed to malloc subscribe ep request"
);
}
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
SMqAskEpCbParam
*
pParam
=
malloc
(
sizeof
(
SMqAskEpCbParam
));
if
(
pParam
==
NULL
)
{
free
(
buf
);
goto
END
;
}
pParam
->
tmq
=
tmq
;
pParam
->
wait
=
wait
;
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmq_ask_ep_cb
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
END:
if
(
wait
)
tsem_wait
(
&
tmq
->
rspSem
);
return
0
;
}
SMqConsumeReq
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
blocking_time
,
int32_t
type
,
SMqClientTopic
*
pTopic
,
SMqClientVg
**
ppVg
)
{
SMqConsumeReq
*
pReq
=
malloc
(
sizeof
(
SMqConsumeReq
));
if
(
pReq
==
NULL
)
{
return
NULL
;
}
pReq
->
reqType
=
type
;
pReq
->
blockingTime
=
blocking_time
;
pReq
->
consumerId
=
tmq
->
consumerId
;
strcpy
(
pReq
->
cgroup
,
tmq
->
groupId
);
tmq
->
nextTopicIdx
=
(
tmq
->
nextTopicIdx
+
1
)
%
taosArrayGetSize
(
tmq
->
clientTopics
);
strcpy
(
pReq
->
topic
,
pTopic
->
topicName
);
pTopic
->
nextVgIdx
=
(
pTopic
->
nextVgIdx
+
1
%
taosArrayGetSize
(
pTopic
->
vgs
));
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
pTopic
->
nextVgIdx
);
pReq
->
offset
=
pVg
->
currentOffset
+
1
;
*
ppVg
=
pVg
;
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqConsumeReq
));
return
pReq
;
}
tmq_message_t
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
tmq_message_t
*
tmq_message
=
NULL
;
int64_t
status
=
atomic_load_64
(
&
tmq
->
status
);
tmqAsyncAskEp
(
tmq
,
status
==
0
||
taosArrayGetSize
(
tmq
->
clientTopics
));
/*if (blocking_time < 0) blocking_time = 500;*/
blocking_time
=
1000
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
{
tscDebug
(
"consumer:%ld poll but not assigned"
,
tmq
->
consumerId
);
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
tmq
->
nextTopicIdx
);
if
(
taosArrayGetSize
(
pTopic
->
vgs
)
==
0
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
SMqClientVg
*
pVg
=
NULL
;
SMqConsumeReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blocking_time
,
TMQ_REQ_TYPE_CONSUME_ONLY
,
pTopic
,
&
pVg
);
if
(
pReq
==
NULL
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
SMqConsumeCbParam
*
param
=
malloc
(
sizeof
(
SMqConsumeCbParam
));
if
(
param
==
NULL
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
param
->
tmq
=
tmq
;
param
->
retMsg
=
&
tmq_message
;
param
->
pVg
=
pVg
;
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_VND_CONSUME
);
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
pReq
,
.
len
=
sizeof
(
SMqConsumeReq
)
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
param
;
sendInfo
->
fp
=
tmq_poll_cb_inner
;
/*printf("req offset: %ld\n", pReq->offset);*/
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
tmq
->
pollCnt
++
;
usleep
(
blocking_time
*
1000
);
return
tmq_message
;
/*tsem_wait(&pRequest->body.rspSem);*/
/*if (body != NULL) {*/
/*destroySendMsgInfo(body);*/
/*}*/
/*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
/*pRequest->code = terrno;*/
/*}*/
/*return pRequest;*/
}
tmq_resp_err_t
*
tmq_commit
(
tmq_t
*
tmq
,
tmq_topic_vgroup_list_t
*
tmq_topic_vgroup_list
,
int32_t
async
)
{
SMqConsumeReq
req
=
{
0
};
return
NULL
;
}
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
)
{
if
(
tmq_message
==
NULL
)
return
;
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
...
...
source/client/src/tmq.c
0 → 100644
浏览文件 @
d17dd905
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#include "tdef.h"
#include "tep.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedfile.h"
#include "tref.h"
typedef
struct
SMqClientVg
{
// statistics
int64_t
pollCnt
;
// offset
int64_t
committedOffset
;
int64_t
currentOffset
;
//connection info
int32_t
vgId
;
SEpSet
epSet
;
}
SMqClientVg
;
typedef
struct
SMqClientTopic
{
// subscribe info
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
int32_t
nextVgIdx
;
SArray
*
vgs
;
//SArray<SMqClientVg>
}
SMqClientTopic
;
typedef
struct
SMqAskEpCbParam
{
tmq_t
*
tmq
;
int32_t
wait
;
}
SMqAskEpCbParam
;
struct
tmq_resp_err_t
{
int32_t
code
;
};
struct
tmq_topic_vgroup_t
{
char
*
topic
;
int32_t
vgId
;
int64_t
commitOffset
;
};
struct
tmq_topic_vgroup_list_t
{
int32_t
cnt
;
int32_t
size
;
tmq_topic_vgroup_t
*
elems
;
};
typedef
struct
SMqConsumeCbParam
{
tmq_t
*
tmq
;
SMqClientVg
*
pVg
;
tmq_message_t
**
retMsg
;
}
SMqConsumeCbParam
;
struct
tmq_conf_t
{
char
clientId
[
256
];
char
groupId
[
256
];
char
*
ip
;
uint16_t
port
;
tmq_commit_cb
*
commit_cb
;
};
struct
tmq_message_t
{
SMqConsumeRsp
rsp
;
};
tmq_conf_t
*
tmq_conf_new
()
{
tmq_conf_t
*
conf
=
calloc
(
1
,
sizeof
(
tmq_conf_t
));
return
conf
;
}
int32_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
)
{
if
(
strcmp
(
key
,
"group.id"
)
==
0
)
{
strcpy
(
conf
->
groupId
,
value
);
}
if
(
strcmp
(
key
,
"client.id"
)
==
0
)
{
strcpy
(
conf
->
clientId
,
value
);
}
return
0
;
}
struct
tmq_t
{
char
groupId
[
256
];
char
clientId
[
256
];
SRWLatch
lock
;
int64_t
consumerId
;
int64_t
epoch
;
int64_t
status
;
tsem_t
rspSem
;
STscObj
*
pTscObj
;
tmq_commit_cb
*
commit_cb
;
int32_t
nextTopicIdx
;
SArray
*
clientTopics
;
//SArray<SMqClientTopic>
//stat
int64_t
pollCnt
;
};
struct
tmq_list_t
{
int32_t
cnt
;
int32_t
tot
;
char
*
elems
[];
};
tmq_list_t
*
tmq_list_new
()
{
tmq_list_t
*
ptr
=
malloc
(
sizeof
(
tmq_list_t
)
+
8
*
sizeof
(
char
*
));
if
(
ptr
==
NULL
)
{
return
ptr
;
}
ptr
->
cnt
=
0
;
ptr
->
tot
=
8
;
return
ptr
;
}
int32_t
tmq_list_append
(
tmq_list_t
*
ptr
,
char
*
src
)
{
if
(
ptr
->
cnt
>=
ptr
->
tot
-
1
)
return
-
1
;
ptr
->
elems
[
ptr
->
cnt
]
=
strdup
(
src
);
ptr
->
cnt
++
;
return
0
;
}
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
)
{
tmq_t
*
pTmq
=
calloc
(
sizeof
(
tmq_t
),
1
);
if
(
pTmq
==
NULL
)
{
return
NULL
;
}
pTmq
->
pTscObj
=
(
STscObj
*
)
conn
;
pTmq
->
status
=
0
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
taosInitRWLatch
(
&
pTmq
->
lock
);
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
commit_cb
=
conf
->
commit_cb
;
tsem_init
(
&
pTmq
->
rspSem
,
0
,
0
);
pTmq
->
consumerId
=
generateRequestId
()
&
((
uint64_t
)
-
1
>>
1
);
pTmq
->
clientTopics
=
taosArrayInit
(
0
,
sizeof
(
SMqClientTopic
));
return
pTmq
;
}
TAOS_RES
*
tmq_subscribe
(
tmq_t
*
tmq
,
tmq_list_t
*
topic_list
)
{
SRequestObj
*
pRequest
=
NULL
;
int32_t
sz
=
topic_list
->
cnt
;
//destroy ex
taosArrayDestroy
(
tmq
->
clientTopics
);
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
SCMSubscribeReq
req
;
req
.
topicNum
=
sz
;
req
.
consumerId
=
tmq
->
consumerId
;
req
.
consumerGroup
=
strdup
(
tmq
->
groupId
);
req
.
topicNames
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topicName
=
topic_list
->
elems
[
i
];
SName
name
=
{
0
};
char
*
dbName
=
getDbOfConnection
(
tmq
->
pTscObj
);
tNameSetDbName
(
&
name
,
tmq
->
pTscObj
->
acctId
,
dbName
,
strlen
(
dbName
));
tNameFromString
(
&
name
,
topicName
,
T_NAME_TABLE
);
char
*
topicFname
=
calloc
(
1
,
TSDB_TOPIC_FNAME_LEN
);
if
(
topicFname
==
NULL
)
{
}
tNameExtractFullName
(
&
name
,
topicFname
);
tscDebug
(
"subscribe topic: %s"
,
topicFname
);
SMqClientTopic
topic
=
{
.
nextVgIdx
=
0
,
.
sql
=
NULL
,
.
sqlLen
=
0
,
.
topicId
=
0
,
.
topicName
=
topicFname
,
.
vgs
=
NULL
};
topic
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
SMqClientVg
));
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
/*SMqClientTopic topic = {*/
/*.*/
/*};*/
taosArrayPush
(
req
.
topicNames
,
&
topicFname
);
}
int
tlen
=
tSerializeSCMSubscribeReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
goto
_return
;
}
void
*
abuf
=
buf
;
tSerializeSCMSubscribeReq
(
&
abuf
,
&
req
);
/*printf("formatted: %s\n", dagStr);*/
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_MND_SUBSCRIBE
);
if
(
pRequest
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
}
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
/*sendInfo->fp*/
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
_return:
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if
(
pRequest
!=
NULL
&&
terrno
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
terrno
;
}
return
pRequest
;
}
void
tmq_conf_set_offset_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
)
{
conf
->
commit_cb
=
cb
;
}
SArray
*
tmqGetConnInfo
(
SClientHbKey
connKey
,
void
*
param
)
{
tmq_t
*
pTmq
=
(
void
*
)
param
;
SArray
*
pArray
=
taosArrayInit
(
0
,
sizeof
(
SKv
));
if
(
pArray
==
NULL
)
{
return
NULL
;
}
SKv
kv
=
{
0
};
kv
.
key
=
HEARTBEAT_KEY_MQ_TMP
;
SMqHbMsg
*
pMqHb
=
malloc
(
sizeof
(
SMqHbMsg
));
if
(
pMqHb
==
NULL
)
{
return
pArray
;
}
pMqHb
->
consumerId
=
connKey
.
connId
;
SArray
*
clientTopics
=
pTmq
->
clientTopics
;
int
sz
=
taosArrayGetSize
(
clientTopics
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
*
pCTopic
=
taosArrayGet
(
clientTopics
,
i
);
/*if (pCTopic->vgId == -1) {*/
/*pMqHb->status = 1;*/
/*break;*/
/*}*/
}
kv
.
value
=
pMqHb
;
kv
.
valueLen
=
sizeof
(
SMqHbMsg
);
taosArrayPush
(
pArray
,
&
kv
);
return
pArray
;
}
tmq_t
*
tmqCreateConsumerImpl
(
TAOS
*
conn
,
tmq_conf_t
*
conf
)
{
tmq_t
*
pTmq
=
malloc
(
sizeof
(
tmq_t
));
if
(
pTmq
==
NULL
)
{
return
NULL
;
}
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
pTmq
->
pTscObj
=
(
STscObj
*
)
conn
;
pTmq
->
pTscObj
->
connType
=
HEARTBEAT_TYPE_MQ
;
return
pTmq
;
}
TAOS_RES
*
taos_create_topic
(
TAOS
*
taos
,
const
char
*
topicName
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQueryNode
=
NULL
;
char
*
pStr
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
if
(
taos
==
NULL
||
topicName
==
NULL
||
sql
==
NULL
)
{
tscError
(
"invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s"
,
taos
,
topicName
,
sql
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
if
(
strlen
(
topicName
)
>=
TSDB_TOPIC_NAME_LEN
)
{
tscError
(
"topic name too long, max length:%d"
,
TSDB_TOPIC_NAME_LEN
-
1
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
if
(
sqlLen
>
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
goto
_return
;
}
tscDebug
(
"start to create topic, %s"
,
topicName
);
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
&
pQueryNode
),
_return
);
SQueryStmtInfo
*
pQueryStmtInfo
=
(
SQueryStmtInfo
*
)
pQueryNode
;
pQueryStmtInfo
->
info
.
continueQuery
=
true
;
// todo check for invalid sql statement and return with error code
SSchema
*
schema
=
NULL
;
int32_t
numOfCols
=
0
;
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQueryNode
,
&
pRequest
->
body
.
pDag
,
&
schema
,
&
numOfCols
,
NULL
,
pRequest
->
requestId
),
_return
);
pStr
=
qDagToString
(
pRequest
->
body
.
pDag
);
if
(
pStr
==
NULL
)
{
goto
_return
;
}
printf
(
"%s
\n
"
,
pStr
);
// The topic should be related to a database that the queried table is belonged to.
SName
name
=
{
0
};
char
dbName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
((
SQueryStmtInfo
*
)
pQueryNode
)
->
pTableMetaInfo
[
0
]
->
name
,
dbName
);
tNameFromString
(
&
name
,
dbName
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameFromString
(
&
name
,
topicName
,
T_NAME_TABLE
);
char
topicFname
[
TSDB_TOPIC_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
name
,
topicFname
);
SCMCreateTopicReq
req
=
{
.
name
=
(
char
*
)
topicFname
,
.
igExists
=
1
,
.
physicalPlan
=
(
char
*
)
pStr
,
.
sql
=
(
char
*
)
sql
,
.
logicalPlan
=
"no logic plan"
,
};
int
tlen
=
tSerializeSCMCreateTopicReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
goto
_return
;
}
void
*
abuf
=
buf
;
tSerializeSCMCreateTopicReq
(
&
abuf
,
&
req
);
/*printf("formatted: %s\n", dagStr);*/
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
pRequest
->
type
=
TDMT_MND_CREATE_TOPIC
;
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
SEpSet
epSet
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
_return:
qDestroyQuery
(
pQueryNode
);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if
(
pRequest
!=
NULL
&&
terrno
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
terrno
;
}
return
pRequest
;
}
static
char
*
formatTimestamp
(
char
*
buf
,
int64_t
val
,
int
precision
)
{
time_t
tt
;
int32_t
ms
=
0
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
tt
=
(
time_t
)(
val
/
1000000000
);
ms
=
val
%
1000000000
;
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
tt
=
(
time_t
)(
val
/
1000000
);
ms
=
val
%
1000000
;
}
else
{
tt
=
(
time_t
)(
val
/
1000
);
ms
=
val
%
1000
;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if
(
tt
<
0
)
tt
=
0
;
#endif
if
(
tt
<=
0
&&
ms
<
0
)
{
tt
--
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
ms
+=
1000000000
;
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
ms
+=
1000000
;
}
else
{
ms
+=
1000
;
}
}
struct
tm
*
ptm
=
localtime
(
&
tt
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
sprintf
(
buf
+
pos
,
".%09d"
,
ms
);
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
sprintf
(
buf
+
pos
,
".%06d"
,
ms
);
}
else
{
sprintf
(
buf
+
pos
,
".%03d"
,
ms
);
}
return
buf
;
}
int32_t
tmq_poll_cb_inner
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
if
(
code
==
-
1
)
{
printf
(
"msg discard
\n
"
);
return
0
;
}
char
pBuf
[
128
];
SMqConsumeCbParam
*
pParam
=
(
SMqConsumeCbParam
*
)
param
;
SMqClientVg
*
pVg
=
pParam
->
pVg
;
SMqConsumeRsp
rsp
;
tDecodeSMqConsumeRsp
(
pMsg
->
pData
,
&
rsp
);
if
(
rsp
.
numOfTopics
==
0
)
{
/*printf("no data\n");*/
return
0
;
}
int32_t
colNum
=
rsp
.
schemas
->
nCols
;
pVg
->
currentOffset
=
rsp
.
rspOffset
;
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/
/*printf("-----msg begin----\n");*/
printf
(
"|"
);
for
(
int32_t
i
=
0
;
i
<
colNum
;
i
++
)
{
if
(
i
==
0
)
printf
(
" %25s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
else
printf
(
" %15s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
}
printf
(
"
\n
"
);
printf
(
"===============================================
\n
"
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
pBlockData
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
rsp
.
pBlockData
,
i
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
printf
(
"|"
);
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
formatTimestamp
(
pBuf
,
*
(
uint64_t
*
)
var
,
TSDB_TIME_PRECISION_MILLI
);
printf
(
" %25s |"
,
pBuf
);
break
;
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
printf
(
" %15u |"
,
*
(
uint32_t
*
)
var
);
break
;
}
}
printf
(
"
\n
"
);
}
}
/*printf("\n-----msg end------\n");*/
return
0
;
}
int32_t
tmq_ask_ep_cb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqAskEpCbParam
*
pParam
=
(
SMqAskEpCbParam
*
)
param
;
tmq_t
*
tmq
=
pParam
->
tmq
;
if
(
code
!=
0
)
{
printf
(
"get topic endpoint error, not ready, wait:%d
\n
"
,
pParam
->
wait
);
if
(
pParam
->
wait
)
{
tsem_post
(
&
tmq
->
rspSem
);
}
return
0
;
}
tscDebug
(
"tmq ask ep cb called"
);
bool
set
=
false
;
SMqCMGetSubEpRsp
rsp
;
tDecodeSMqCMGetSubEpRsp
(
pMsg
->
pData
,
&
rsp
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
topics
);
// TODO: lock
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
if
(
rsp
.
epoch
!=
tmq
->
epoch
)
{
//TODO
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
rsp
.
topics
,
i
);
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
int32_t
vgSz
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
topic
.
vgs
=
taosArrayInit
(
vgSz
,
sizeof
(
SMqClientVg
));
for
(
int32_t
j
=
0
;
j
<
vgSz
;
j
++
)
{
SMqSubVgEp
*
pVgEp
=
taosArrayGet
(
pTopicEp
->
vgs
,
j
);
SMqClientVg
clientVg
=
{
.
pollCnt
=
0
,
.
committedOffset
=
-
1
,
.
currentOffset
=
-
1
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
};
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
}
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
}
tmq
->
epoch
=
rsp
.
epoch
;
}
if
(
set
)
{
atomic_store_64
(
&
tmq
->
status
,
1
);
}
// unlock
/*tsem_post(&tmq->rspSem);*/
if
(
pParam
->
wait
)
{
tsem_post
(
&
tmq
->
rspSem
);
}
free
(
pParam
);
return
0
;
}
int32_t
tmqAsyncAskEp
(
tmq_t
*
tmq
,
bool
wait
)
{
int32_t
tlen
=
sizeof
(
SMqCMGetSubEpReq
);
SMqCMGetSubEpReq
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
tscError
(
"failed to malloc get subscribe ep buf"
);
}
buf
->
consumerId
=
htobe64
(
tmq
->
consumerId
);
strcpy
(
buf
->
cgroup
,
tmq
->
groupId
);
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_MND_GET_SUB_EP
);
if
(
pRequest
==
NULL
)
{
tscError
(
"failed to malloc subscribe ep request"
);
}
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
SMqAskEpCbParam
*
pParam
=
malloc
(
sizeof
(
SMqAskEpCbParam
));
if
(
pParam
==
NULL
)
{
free
(
buf
);
goto
END
;
}
pParam
->
tmq
=
tmq
;
pParam
->
wait
=
wait
;
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmq_ask_ep_cb
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
END:
if
(
wait
)
tsem_wait
(
&
tmq
->
rspSem
);
return
0
;
}
SMqConsumeReq
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
blocking_time
,
int32_t
type
,
SMqClientTopic
*
pTopic
,
SMqClientVg
**
ppVg
)
{
SMqConsumeReq
*
pReq
=
malloc
(
sizeof
(
SMqConsumeReq
));
if
(
pReq
==
NULL
)
{
return
NULL
;
}
pReq
->
reqType
=
type
;
pReq
->
blockingTime
=
blocking_time
;
pReq
->
consumerId
=
tmq
->
consumerId
;
strcpy
(
pReq
->
cgroup
,
tmq
->
groupId
);
tmq
->
nextTopicIdx
=
(
tmq
->
nextTopicIdx
+
1
)
%
taosArrayGetSize
(
tmq
->
clientTopics
);
strcpy
(
pReq
->
topic
,
pTopic
->
topicName
);
pTopic
->
nextVgIdx
=
(
pTopic
->
nextVgIdx
+
1
%
taosArrayGetSize
(
pTopic
->
vgs
));
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
pTopic
->
nextVgIdx
);
pReq
->
offset
=
pVg
->
currentOffset
+
1
;
*
ppVg
=
pVg
;
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqConsumeReq
));
return
pReq
;
}
tmq_message_t
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
tmq_message_t
*
tmq_message
=
NULL
;
int64_t
status
=
atomic_load_64
(
&
tmq
->
status
);
tmqAsyncAskEp
(
tmq
,
status
==
0
||
taosArrayGetSize
(
tmq
->
clientTopics
));
/*if (blocking_time < 0) blocking_time = 500;*/
blocking_time
=
1000
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
{
tscDebug
(
"consumer:%ld poll but not assigned"
,
tmq
->
consumerId
);
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
tmq
->
nextTopicIdx
);
if
(
taosArrayGetSize
(
pTopic
->
vgs
)
==
0
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
SMqClientVg
*
pVg
=
NULL
;
SMqConsumeReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blocking_time
,
TMQ_REQ_TYPE_CONSUME_ONLY
,
pTopic
,
&
pVg
);
if
(
pReq
==
NULL
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
SMqConsumeCbParam
*
param
=
malloc
(
sizeof
(
SMqConsumeCbParam
));
if
(
param
==
NULL
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
param
->
tmq
=
tmq
;
param
->
retMsg
=
&
tmq_message
;
param
->
pVg
=
pVg
;
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_VND_CONSUME
);
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
pReq
,
.
len
=
sizeof
(
SMqConsumeReq
)
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
param
;
sendInfo
->
fp
=
tmq_poll_cb_inner
;
/*printf("req offset: %ld\n", pReq->offset);*/
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
tmq
->
pollCnt
++
;
usleep
(
blocking_time
*
1000
);
return
tmq_message
;
/*tsem_wait(&pRequest->body.rspSem);*/
/*if (body != NULL) {*/
/*destroySendMsgInfo(body);*/
/*}*/
/*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
/*pRequest->code = terrno;*/
/*}*/
/*return pRequest;*/
}
tmq_resp_err_t
*
tmq_commit
(
tmq_t
*
tmq
,
tmq_topic_vgroup_list_t
*
tmq_topic_vgroup_list
,
int32_t
async
)
{
SMqConsumeReq
req
=
{
0
};
return
NULL
;
}
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
)
{
if
(
tmq_message
==
NULL
)
return
;
}
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
)
{
assert
(
pMsgBody
!=
NULL
);
tfree
(
pMsgBody
->
msgInfo
.
pData
);
tfree
(
pMsgBody
);
}
source/client/test/CMakeLists.txt
浏览文件 @
d17dd905
...
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE
(
clientTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
clientTest
PUBLIC os util common transport gtest taos qcom
PUBLIC os util common transport
parser catalog scheduler function
gtest taos qcom
)
TARGET_INCLUDE_DIRECTORIES
(
...
...
source/client/test/clientTests.cpp
浏览文件 @
d17dd905
...
...
@@ -625,7 +625,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = t
aos
_consumer_new(pConn, conf, NULL, 0);
tmq_t* tmq = t
mq
_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_ctb_topic_1");
...
...
@@ -637,6 +637,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
//if (msg == NULL) break;
}
}
#endif
TEST
(
testCase
,
tmq_subscribe_stb_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -650,7 +651,7 @@ TEST(testCase, tmq_subscribe_stb_Test) {
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_t* tmq = t
aos
_consumer_new(pConn, conf, NULL, 0);
tmq_t
*
tmq
=
t
mq
_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"test_stb_topic_1"
);
...
...
@@ -668,7 +669,6 @@ TEST(testCase, tmq_consume_Test) {
TEST
(
testCase
,
tmq_commit_TEST
)
{
}
#endif
#if 0
TEST(testCase, projection_query_tables) {
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
d17dd905
...
...
@@ -40,7 +40,6 @@ typedef struct STqCfg {
int32_t
reserved
;
}
STqCfg
;
typedef
struct
SVnodeCfg
{
int32_t
vgId
;
SDnode
*
pDnode
;
...
...
@@ -69,18 +68,18 @@ typedef struct {
}
SVnodeOpt
;
typedef
struct
STqReadHandle
{
int64_t
ver
;
uint64_t
tbUid
;
SHashObj
*
tbIdHash
;
const
SSubmitMsg
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pVnodeMeta
;
SArray
*
pColIdList
;
//
SArray<int32_t>
int32_t
sver
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
int64_t
ver
;
uint64_t
tbUid
;
SHashObj
*
tbIdHash
;
const
SSubmitMsg
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pVnodeMeta
;
SArray
*
pColIdList
;
//
SArray<int32_t>
int32_t
sver
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
}
STqReadHandle
;
/* ------------------------ SVnode ------------------------ */
...
...
@@ -202,35 +201,34 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/* ------------------------- TQ QUERY -------------------------- */
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
);
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
);
static
FORCE_INLINE
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
)
{
static
FORCE_INLINE
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
)
{
pReadHandle
->
pColIdList
=
pColIdList
;
}
//static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, const SArray* pTableIdList) {
//
pHandle->tbUid = pTableIdList;
//
static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, const SArray* pTableIdList) {
//
pHandle->tbUid = pTableIdList;
//}
static
FORCE_INLINE
int
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
static
FORCE_INLINE
int
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
pHandle
->
tbIdHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_NO_LOCK
);
if
(
pHandle
->
tbIdHash
==
NULL
)
{
return
-
1
;
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pHandle
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
),
NULL
,
0
);
//pHandle->tbUid = tbUid;
//
pHandle->tbUid = tbUid;
}
return
0
;
}
void
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitMsg
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
int
tqRetrieveDataBlockInfo
(
STqReadHandle
*
pHandle
,
SDataBlockInfo
*
pBlockInfo
);
void
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitMsg
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
int
tqRetrieveDataBlockInfo
(
STqReadHandle
*
pHandle
,
SDataBlockInfo
*
pBlockInfo
);
// return SArray<SColumnInfoData>
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
);
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
);
#ifdef __cplusplus
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录