Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ad9731bd
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ad9731bd
编写于
4月 12, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
merge from 3.0
上级
db796f9e
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
58 addition
and
54 deletion
+58
-54
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+3
-2
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+42
-42
source/client/src/clientMain.c
source/client/src/clientMain.c
+3
-3
source/client/src/tmq.c
source/client/src/tmq.c
+10
-7
未找到文件。
source/client/inc/clientInt.h
浏览文件 @
ad9731bd
...
...
@@ -282,9 +282,10 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
void
*
doFetchRow
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
);
void
doSetOneRowPtr
(
SReqResultInfo
*
pResultInfo
);
int32_t
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
,
bool
convertUcs4
);
int32_t
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
,
bool
convertUcs4
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
);
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
,
bool
convertUcs4
);
// --- heartbeat
// global, called by mgmt
...
...
source/client/src/clientImpl.c
浏览文件 @
ad9731bd
...
...
@@ -41,7 +41,6 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
static
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
static
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
...
...
@@ -210,13 +209,11 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
)
{
pRequest
->
type
=
pQuery
->
msgType
;
SPlanContext
cxt
=
{
.
queryId
=
pRequest
->
requestId
,
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
};
.
showRewrite
=
pQuery
->
showRewrite
};
int32_t
code
=
qCreateQueryPlan
(
&
cxt
,
pPlan
,
pNodeList
);
if
(
code
!=
0
)
{
return
code
;
...
...
@@ -253,7 +250,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
void
*
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
...
...
@@ -319,7 +317,7 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
}
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
)
{
SCatalog
*
pCatalog
=
NULL
;
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
0
;
int32_t
dbNum
=
taosArrayGetSize
(
pRequest
->
dbList
);
int32_t
tblNum
=
taosArrayGetSize
(
pRequest
->
tableList
);
...
...
@@ -336,7 +334,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
SEpSet
epset
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
for
(
int32_t
i
=
0
;
i
<
dbNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pRequest
->
dbList
,
i
);
char
*
dbFName
=
taosArrayGet
(
pRequest
->
dbList
,
i
);
code
=
catalogRefreshDBVgInfo
(
pCatalog
,
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
dbFName
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -345,7 +343,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
}
for
(
int32_t
i
=
0
;
i
<
tblNum
;
++
i
)
{
SName
*
tableName
=
taosArrayGet
(
pRequest
->
tableList
,
i
);
SName
*
tableName
=
taosArrayGet
(
pRequest
->
tableList
,
i
);
code
=
catalogRefreshTableMeta
(
pCatalog
,
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
tableName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -356,7 +354,6 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
return
code
;
}
SRequestObj
*
execQuery
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
)
{
SRequestObj
*
pRequest
=
NULL
;
int32_t
retryNum
=
0
;
...
...
@@ -508,7 +505,8 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
}
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
)
{
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
||
msgType
==
TDMT_VND_QUERY_HEARTBEAT_RSP
;
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
||
msgType
==
TDMT_VND_QUERY_HEARTBEAT_RSP
;
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
...
...
@@ -535,10 +533,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int32_t
elapsed
=
pRequest
->
metric
.
rsp
-
pRequest
->
metric
.
start
;
if
(
pMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"0x%"
PRIx64
" message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
}
else
{
tscError
(
"0x%"
PRIx64
" SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
}
taosReleaseRef
(
clientReqRefPool
,
pSendInfo
->
requestObjRefId
);
...
...
@@ -769,7 +767,8 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
return
TSDB_CODE_SUCCESS
;
}
int32_t
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
,
bool
convertUcs4
)
{
int32_t
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
,
bool
convertUcs4
)
{
assert
(
numOfCols
>
0
&&
pFields
!=
NULL
&&
pResultInfo
!=
NULL
);
if
(
numOfRows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -850,5 +849,6 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
// TODO handle the compressed case
pResultInfo
->
totalRows
+=
pResultInfo
->
numOfRows
;
return
setResultDataPtr
(
pResultInfo
,
pResultInfo
->
fields
,
pResultInfo
->
numOfCols
,
pResultInfo
->
numOfRows
,
convertUcs4
);
return
setResultDataPtr
(
pResultInfo
,
pResultInfo
->
fields
,
pResultInfo
->
numOfCols
,
pResultInfo
->
numOfRows
,
convertUcs4
);
}
source/client/src/clientMain.c
浏览文件 @
ad9731bd
...
...
@@ -168,7 +168,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return
NULL
;
}
return
doFetchRow
(
pRequest
,
true
);
return
doFetchRow
(
pRequest
,
true
,
false
);
}
else
if
(
TD_RES_TMQ
(
res
))
{
tmq_message_t
*
msg
=
((
tmq_message_t
*
)
res
);
...
...
@@ -430,7 +430,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
return
0
;
}
doFetchRow
(
pRequest
,
false
);
doFetchRow
(
pRequest
,
false
,
false
);
// TODO refactor
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
...
...
@@ -465,7 +465,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
return
0
;
}
doFetchRow
(
pRequest
,
false
);
doFetchRow
(
pRequest
,
false
,
false
);
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
...
...
source/client/src/tmq.c
浏览文件 @
ad9731bd
...
...
@@ -835,7 +835,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
msgEpoch
<
tmqEpoch
)
{
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
/*tsem_post(&tmq->rspSem);*/
tscWarn
(
"msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
tscWarn
(
"msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
return
0
;
}
...
...
@@ -872,8 +873,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqPollRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRsp
->
msg
);
pRsp
->
iter
.
curBlock
=
0
;
pRsp
->
iter
.
curRow
=
0
;
/*pRsp->iter.curBlock = 0;*/
/*pRsp->iter.curRow = 0;*/
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
...
...
@@ -885,8 +886,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
#endif
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld"
,
tmq
->
consumerId
,
pParam
->
pVg
->
vgId
,
pRsp
->
msg
.
reqOffset
,
pRsp
->
msg
.
rspOffset
);
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld"
,
tmq
->
consumerId
,
pParam
->
pVg
->
vgId
,
pRsp
->
msg
.
r
eqOffset
,
pRsp
->
msg
.
r
spOffset
);
pRsp
->
vg
=
pParam
->
pVg
;
taosWriteQitem
(
tmq
->
mqueue
,
pRsp
);
...
...
@@ -907,7 +908,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
bool
set
=
false
;
int32_t
topicNumGet
=
taosArrayGetSize
(
pRsp
->
topics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
tscDebug
(
"consumer %ld update ep epoch %d to epoch %d, topic num: %d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
);
tscDebug
(
"consumer %ld update ep epoch %d to epoch %d, topic num: %d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
);
SArray
*
newTopics
=
taosArrayInit
(
topicNumGet
,
sizeof
(
SMqClientTopic
));
if
(
newTopics
==
NULL
)
{
return
false
;
...
...
@@ -1275,7 +1277,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t
transporterId
=
0
;
/*printf("send poll\n");*/
atomic_add_fetch_32
(
&
tmq
->
waitingRequest
,
1
);
tscDebug
(
"consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
tmq
->
epoch
,
pVg
->
currentOffset
,
pReq
->
reqId
);
tscDebug
(
"consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
tmq
->
epoch
,
pVg
->
currentOffset
,
pReq
->
reqId
);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
pVg
->
pollCnt
++
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录