Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
878401c1
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
878401c1
编写于
2月 17, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix query crash on super table
上级
cc5ae34d
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
127 addition
and
109 deletion
+127
-109
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-0
source/client/src/clientHb.c
source/client/src/clientHb.c
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+94
-94
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+3
-3
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+1
-1
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+1
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+13
-2
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+11
-8
未找到文件。
include/libs/qcom/query.h
浏览文件 @
878401c1
...
...
@@ -112,6 +112,7 @@ typedef struct STableMetaOutput {
typedef
struct
SDataBuf
{
void
*
pData
;
uint32_t
len
;
void
*
handle
;
}
SDataBuf
;
typedef
int32_t
(
*
__async_send_cb_fn_t
)(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
...
...
source/client/src/clientHb.c
浏览文件 @
878401c1
...
...
@@ -444,7 +444,7 @@ static void* hbThreadFunc(void* param) {
}
void
*
abuf
=
buf
;
tSerializeSClientHbBatchReq
(
&
abuf
,
pReq
);
SMsgSendInfo
*
pInfo
=
malloc
(
sizeof
(
SMsgSendInfo
));
SMsgSendInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
pInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tFreeClientHbBatchReq
(
pReq
,
false
);
...
...
source/client/src/clientImpl.c
浏览文件 @
878401c1
...
...
@@ -12,8 +12,8 @@
#include "tpagedfile.h"
#include "tref.h"
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
static
void
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
);
...
...
@@ -30,17 +30,11 @@ static bool stringLengthCheck(const char* str, size_t maxsize) {
return
true
;
}
static
bool
validateUserName
(
const
char
*
user
)
{
return
stringLengthCheck
(
user
,
TSDB_USER_LEN
-
1
);
}
static
bool
validateUserName
(
const
char
*
user
)
{
return
stringLengthCheck
(
user
,
TSDB_USER_LEN
-
1
);
}
static
bool
validatePassword
(
const
char
*
passwd
)
{
return
stringLengthCheck
(
passwd
,
TSDB_PASSWORD_LEN
-
1
);
}
static
bool
validatePassword
(
const
char
*
passwd
)
{
return
stringLengthCheck
(
passwd
,
TSDB_PASSWORD_LEN
-
1
);
}
static
bool
validateDbName
(
const
char
*
db
)
{
return
stringLengthCheck
(
db
,
TSDB_DB_NAME_LEN
-
1
);
}
static
bool
validateDbName
(
const
char
*
db
)
{
return
stringLengthCheck
(
db
,
TSDB_DB_NAME_LEN
-
1
);
}
static
char
*
getClusterKey
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
ip
,
int32_t
port
)
{
char
key
[
512
]
=
{
0
};
...
...
@@ -48,10 +42,12 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
return
strdup
(
key
);
}
static
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
static
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
static
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
if
(
taos_init
()
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
...
...
@@ -63,7 +59,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
char
localDb
[
TSDB_DB_NAME_LEN
]
=
{
0
};
if
(
db
!=
NULL
)
{
if
(
!
validateDbName
(
db
))
{
if
(
!
validateDbName
(
db
))
{
terrno
=
TSDB_CODE_TSC_INVALID_DB_LENGTH
;
return
NULL
;
}
...
...
@@ -79,7 +75,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return
NULL
;
}
taosEncryptPass_c
((
uint8_t
*
)
pass
,
strlen
(
pass
),
secretEncrypt
);
taosEncryptPass_c
((
uint8_t
*
)
pass
,
strlen
(
pass
),
secretEncrypt
);
}
else
{
tstrncpy
(
secretEncrypt
,
auth
,
tListLen
(
secretEncrypt
));
}
...
...
@@ -122,7 +118,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return
taosConnectImpl
(
user
,
&
secretEncrypt
[
0
],
localDb
,
NULL
,
NULL
,
*
pInst
);
}
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
)
{
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
)
{
*
pRequest
=
createRequest
(
pTscObj
,
NULL
,
NULL
,
TSDB_SQL_SELECT
);
if
(
*
pRequest
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
...
...
@@ -131,7 +127,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
(
*
pRequest
)
->
sqlstr
=
malloc
(
sqlLen
+
1
);
if
((
*
pRequest
)
->
sqlstr
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" failed to prepare sql string buffer"
,
(
*
pRequest
)
->
self
);
tscError
(
"0x%"
PRIx64
" failed to prepare sql string buffer"
,
(
*
pRequest
)
->
self
);
(
*
pRequest
)
->
msgBuf
=
strdup
(
"failed to prepare sql string buffer"
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
@@ -140,7 +136,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
(
*
pRequest
)
->
sqlstr
[
sqlLen
]
=
0
;
(
*
pRequest
)
->
sqlLen
=
sqlLen
;
tscDebugL
(
"0x%"
PRIx64
" SQL: %s, reqId:0x%"
PRIx64
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
sqlstr
,
(
*
pRequest
)
->
requestId
);
tscDebugL
(
"0x%"
PRIx64
" SQL: %s, reqId:0x%"
PRIx64
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
sqlstr
,
(
*
pRequest
)
->
requestId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -174,7 +170,7 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
int32_t
execDdlQuery
(
SRequestObj
*
pRequest
,
SQueryNode
*
pQuery
)
{
SDclStmtInfo
*
pDcl
=
(
SDclStmtInfo
*
)
pQuery
;
pRequest
->
type
=
pDcl
->
msgType
;
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
pDcl
->
pMsg
,
.
len
=
pDcl
->
msgLen
};
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
pDcl
->
pMsg
,
.
len
=
pDcl
->
msgLen
,
.
handle
=
NULL
};
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SMsgSendInfo
*
pSendMsg
=
buildMsgInfoImpl
(
pRequest
);
...
...
@@ -250,9 +246,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
return
schedulerAsyncExecJob
(
pTransporter
,
pNodeList
,
pDag
,
pRequest
->
sqlstr
,
&
pRequest
->
body
.
pQueryJob
);
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
return
NULL
;
...
...
@@ -260,9 +256,9 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
nPrintTsc
(
"%s"
,
sql
)
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQueryNode
=
NULL
;
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQueryNode
=
NULL
;
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
terrno
=
TSDB_CODE_SUCCESS
;
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
...
...
@@ -286,11 +282,11 @@ _return:
return
pRequest
;
}
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
pEpSet
->
version
=
0
;
// init mnode ip set
SEpSet
*
mgmtEpSet
=
&
(
pEpSet
->
epSet
);
SEpSet
*
mgmtEpSet
=
&
(
pEpSet
->
epSet
);
mgmtEpSet
->
numOfEps
=
0
;
mgmtEpSet
->
inUse
=
0
;
...
...
@@ -322,14 +318,15 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
return
0
;
}
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
)
{
STscObj
*
pTscObj
=
createTscObj
(
user
,
auth
,
db
,
pAppInfo
);
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
)
{
STscObj
*
pTscObj
=
createTscObj
(
user
,
auth
,
db
,
pAppInfo
);
if
(
NULL
==
pTscObj
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
pTscObj
;
}
SRequestObj
*
pRequest
=
createRequest
(
pTscObj
,
fp
,
param
,
TDMT_MND_CONNECT
);
SRequestObj
*
pRequest
=
createRequest
(
pTscObj
,
fp
,
param
,
TDMT_MND_CONNECT
);
if
(
pRequest
==
NULL
)
{
destroyTscObj
(
pTscObj
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -343,22 +340,24 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
const
char
*
errorMsg
=
(
pRequest
->
code
==
TSDB_CODE_RPC_FQDN_ERROR
)
?
taos_errstr
(
pRequest
)
:
tstrerror
(
pRequest
->
code
);
const
char
*
errorMsg
=
(
pRequest
->
code
==
TSDB_CODE_RPC_FQDN_ERROR
)
?
taos_errstr
(
pRequest
)
:
tstrerror
(
pRequest
->
code
);
printf
(
"failed to connect to server, reason: %s
\n\n
"
,
errorMsg
);
destroyRequest
(
pRequest
);
taos_close
(
pTscObj
);
pTscObj
=
NULL
;
}
else
{
tscDebug
(
"0x%"
PRIx64
" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"
PRIx64
,
pTscObj
->
id
,
pTscObj
->
connId
,
pTscObj
->
pAppInfo
->
pTransporter
,
pRequest
->
requestId
);
tscDebug
(
"0x%"
PRIx64
" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"
PRIx64
,
pTscObj
->
id
,
pTscObj
->
connId
,
pTscObj
->
pAppInfo
->
pTransporter
,
pRequest
->
requestId
);
destroyRequest
(
pRequest
);
}
return
pTscObj
;
}
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
)
{
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
)
{
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
pMsgSendInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -371,14 +370,14 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
pMsgSendInfo
->
fp
=
handleRequestRspFp
[
TMSG_INDEX
(
pMsgSendInfo
->
msgType
)];
pMsgSendInfo
->
param
=
pRequest
;
SConnectReq
*
pConnect
=
calloc
(
1
,
sizeof
(
SConnectReq
));
SConnectReq
*
pConnect
=
calloc
(
1
,
sizeof
(
SConnectReq
));
if
(
pConnect
==
NULL
)
{
tfree
(
pMsgSendInfo
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
NULL
;
}
STscObj
*
pObj
=
pRequest
->
pTscObj
;
STscObj
*
pObj
=
pRequest
->
pTscObj
;
char
*
db
=
getDbOfConnection
(
pObj
);
if
(
db
!=
NULL
)
{
...
...
@@ -401,17 +400,17 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
ahandle
;
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
ahandle
;
assert
(
pMsg
->
ahandle
!=
NULL
);
if
(
pSendInfo
->
requestObjRefId
!=
0
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
taosAcquireRef
(
clientReqRefPool
,
pSendInfo
->
requestObjRefId
);
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
taosAcquireRef
(
clientReqRefPool
,
pSendInfo
->
requestObjRefId
);
assert
(
pRequest
->
self
==
pSendInfo
->
requestObjRefId
);
pRequest
->
metric
.
rsp
=
taosGetTimestampMs
();
pRequest
->
code
=
pMsg
->
code
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
if
(
pEpSet
)
{
if
(
!
isEpsetEqual
(
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
pEpSet
))
{
updateEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
,
pEpSet
);
...
...
@@ -424,17 +423,17 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
*/
int32_t
elapsed
=
pRequest
->
metric
.
rsp
-
pRequest
->
metric
.
start
;
if
(
pMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"0x%"
PRIx64
" message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
tscDebug
(
"0x%"
PRIx64
" message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
,
pRequest
->
requestId
);
}
else
{
tscError
(
"0x%"
PRIx64
" SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
tscError
(
"0x%"
PRIx64
" SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
,
pRequest
->
requestId
);
}
taosReleaseRef
(
clientReqRefPool
,
pSendInfo
->
requestObjRefId
);
}
SDataBuf
buf
=
{.
len
=
pMsg
->
contLen
,
.
pData
=
NULL
};
SDataBuf
buf
=
{.
len
=
pMsg
->
contLen
,
.
pData
=
NULL
,
.
handle
=
pMsg
->
handle
};
if
(
pMsg
->
contLen
>
0
)
{
buf
.
pData
=
calloc
(
1
,
pMsg
->
contLen
);
...
...
@@ -451,7 +450,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
destroySendMsgInfo
(
pSendInfo
);
}
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
tscDebug
(
"try to connect to %s:%u by auth, user:%s db:%s"
,
ip
,
port
,
user
,
db
);
if
(
user
==
NULL
)
{
user
=
TSDB_DEFAULT_USER
;
...
...
@@ -465,7 +464,8 @@ TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, cons
return
taos_connect_internal
(
ip
,
user
,
NULL
,
auth
,
db
,
port
);
}
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
const
char
*
db
,
int
dbLen
,
uint16_t
port
)
{
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
const
char
*
db
,
int
dbLen
,
uint16_t
port
)
{
char
ipStr
[
TSDB_EP_LEN
]
=
{
0
};
char
dbStr
[
TSDB_DB_NAME_LEN
]
=
{
0
};
char
userStr
[
TSDB_USER_LEN
]
=
{
0
};
...
...
@@ -492,15 +492,15 @@ void* doFetchRow(SRequestObj* pRequest) {
}
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
int32_t
code
=
schedulerFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pResInfo
->
pData
);
int32_t
code
=
schedulerFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pResInfo
->
pData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
code
;
return
NULL
;
}
setQueryResultFromRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pResInfo
->
pData
);
tscDebug
(
"0x%"
PRIx64
" fetch results, numOfRows:%d total Rows:%"
PRId64
", complete:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pResInfo
->
numOfRows
,
pResInfo
->
totalRows
,
pResInfo
->
completed
,
pRequest
->
requestId
);
tscDebug
(
"0x%"
PRIx64
" fetch results, numOfRows:%d total Rows:%"
PRId64
", complete:%d, reqId:0x%"
PRIx64
,
pRe
quest
->
self
,
pResInfo
->
numOfRows
,
pRe
sInfo
->
totalRows
,
pResInfo
->
completed
,
pRequest
->
requestId
);
if
(
pResultInfo
->
numOfRows
==
0
)
{
return
NULL
;
...
...
@@ -535,7 +535,7 @@ void* doFetchRow(SRequestObj* pRequest) {
epSet
=
pVgroupInfo
->
epset
;
int64_t
transporterId
=
0
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
body
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
...
...
@@ -551,7 +551,7 @@ void* doFetchRow(SRequestObj* pRequest) {
SMsgSendInfo
*
body
=
buildMsgInfoImpl
(
pRequest
);
int64_t
transporterId
=
0
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
body
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
...
...
@@ -564,7 +564,7 @@ void* doFetchRow(SRequestObj* pRequest) {
_return:
for
(
int32_t
i
=
0
;
i
<
pResultInfo
->
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pResultInfo
->
numOfCols
;
++
i
)
{
pResultInfo
->
row
[
i
]
=
pResultInfo
->
pCol
[
i
]
+
pResultInfo
->
fields
[
i
].
bytes
*
pResultInfo
->
current
;
if
(
IS_VAR_DATA_TYPE
(
pResultInfo
->
fields
[
i
].
type
))
{
pResultInfo
->
length
[
i
]
=
varDataLen
(
pResultInfo
->
row
[
i
]);
...
...
@@ -596,14 +596,14 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
int32_t
offset
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
pResultInfo
->
length
[
i
]
=
pResultInfo
->
fields
[
i
].
bytes
;
pResultInfo
->
row
[
i
]
=
(
char
*
)
(
pResultInfo
->
pData
+
offset
*
pResultInfo
->
numOfRows
);
pResultInfo
->
row
[
i
]
=
(
char
*
)
(
pResultInfo
->
pData
+
offset
*
pResultInfo
->
numOfRows
);
pResultInfo
->
pCol
[
i
]
=
pResultInfo
->
row
[
i
];
offset
+=
pResultInfo
->
fields
[
i
].
bytes
;
}
}
char
*
getDbOfConnection
(
STscObj
*
pObj
)
{
char
*
p
=
NULL
;
char
*
p
=
NULL
;
pthread_mutex_lock
(
&
pObj
->
mutex
);
size_t
len
=
strlen
(
pObj
->
db
);
if
(
len
>
0
)
{
...
...
@@ -624,8 +624,8 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
void
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
)
{
assert
(
pResultInfo
!=
NULL
&&
pRsp
!=
NULL
);
pResultInfo
->
pRspMsg
=
(
const
char
*
)
pRsp
;
pResultInfo
->
pData
=
(
void
*
)
pRsp
->
data
;
pResultInfo
->
pRspMsg
=
(
const
char
*
)
pRsp
;
pResultInfo
->
pData
=
(
void
*
)
pRsp
->
data
;
pResultInfo
->
numOfRows
=
htonl
(
pRsp
->
numOfRows
);
pResultInfo
->
current
=
0
;
pResultInfo
->
completed
=
(
pRsp
->
completed
==
1
);
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
878401c1
...
...
@@ -105,6 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
pRetrieveMsg
->
showId
=
htobe64
(
pRequest
->
body
.
showInfo
.
execId
);
pMsgSendInfo
->
msgInfo
.
pData
=
pRetrieveMsg
;
pMsgSendInfo
->
msgInfo
.
len
=
sizeof
(
SRetrieveTableReq
);
pMsgSendInfo
->
msgInfo
.
handle
=
NULL
;
}
else
{
SVShowTablesFetchReq
*
pFetchMsg
=
calloc
(
1
,
sizeof
(
SVShowTablesFetchReq
));
if
(
pFetchMsg
==
NULL
)
{
...
...
@@ -116,6 +117,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
pMsgSendInfo
->
msgInfo
.
pData
=
pFetchMsg
;
pMsgSendInfo
->
msgInfo
.
len
=
sizeof
(
SVShowTablesFetchReq
);
pMsgSendInfo
->
msgInfo
.
handle
=
NULL
;
}
}
else
{
assert
(
pRequest
!=
NULL
);
...
...
source/client/src/tmq.c
浏览文件 @
878401c1
...
...
@@ -252,7 +252,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SMqSubscribeCbParam
param
=
{.
rspErr
=
TMQ_RESP_ERR__SUCCESS
,
.
tmq
=
tmq
};
tsem_init
(
&
param
.
rspSem
,
0
,
0
);
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
buf
,
.
len
=
tlen
};
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
buf
,
.
len
=
tlen
,
.
handle
=
NULL
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
param
=
&
param
;
...
...
@@ -381,7 +381,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tSerializeSCMCreateTopicReq
(
&
abuf
,
&
req
);
/*printf("formatted: %s\n", dagStr);*/
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
buf
,
.
len
=
tlen
};
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
buf
,
.
len
=
tlen
,
.
handle
=
NULL
};
pRequest
->
type
=
TDMT_MND_CREATE_TOPIC
;
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
...
...
@@ -686,7 +686,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tsem_init
(
&
param
->
rspSem
,
0
,
0
);
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_VND_CONSUME
);
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
pReq
,
.
len
=
sizeof
(
SMqConsumeReq
)};
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){.
pData
=
pReq
,
.
len
=
sizeof
(
SMqConsumeReq
)
,
.
handle
=
NULL
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
878401c1
...
...
@@ -137,7 +137,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp
.
pCont
=
pMsg
,
.
contLen
=
pInfo
->
msgInfo
.
len
,
.
ahandle
=
(
void
*
)
pInfo
,
.
handle
=
NULL
,
.
handle
=
pInfo
->
msgInfo
.
handle
,
.
code
=
0
};
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
878401c1
...
...
@@ -95,6 +95,7 @@ typedef struct SSchTask {
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
void
*
handle
;
// task send handle
}
SSchTask
;
typedef
struct
SSchJobAttr
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
878401c1
...
...
@@ -18,6 +18,10 @@
#include "query.h"
#include "catalog.h"
typedef
struct
SSchTrans
{
void
*
transInst
;
void
*
transHandle
;
}
SSchTrans
;
static
SSchedulerMgmt
schMgmt
=
{
0
};
uint64_t
schGenTaskId
(
void
)
{
...
...
@@ -932,6 +936,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
pTask
=
*
task
;
SCH_TASK_DLOG
(
"rsp msg received, type:%s, code:%s"
,
TMSG_INFO
(
msgType
),
tstrerror
(
rspCode
));
pTask
->
handle
=
pMsg
->
handle
;
SCH_ERR_JRET
(
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
));
_return:
...
...
@@ -1000,6 +1005,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
int32_t
schAsyncSendMsg
(
void
*
transport
,
SEpSet
*
epSet
,
uint64_t
qId
,
uint64_t
tId
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
code
=
0
;
SSchTrans
*
trans
=
(
SSchTrans
*
)
transport
;
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
pMsgSendInfo
)
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRIx64
" calloc %d failed"
,
qId
,
tId
,
(
int32_t
)
sizeof
(
SMsgSendInfo
));
...
...
@@ -1018,14 +1026,16 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
param
->
queryId
=
qId
;
param
->
taskId
=
tId
;
pMsgSendInfo
->
param
=
param
;
pMsgSendInfo
->
msgInfo
.
pData
=
msg
;
pMsgSendInfo
->
msgInfo
.
len
=
msgSize
;
pMsgSendInfo
->
msgInfo
.
handle
=
trans
->
transHandle
;
pMsgSendInfo
->
msgType
=
msgType
;
pMsgSendInfo
->
fp
=
fp
;
int64_t
transporterId
=
0
;
code
=
asyncSendMsgToServer
(
trans
por
t
,
epSet
,
&
transporterId
,
pMsgSendInfo
);
code
=
asyncSendMsgToServer
(
trans
->
transIns
t
,
epSet
,
&
transporterId
,
pMsgSendInfo
);
if
(
code
)
{
SCH_ERR_JRET
(
code
);
}
...
...
@@ -1149,7 +1159,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
atomic_store_32
(
&
pTask
->
lastMsgType
,
msgType
);
SCH_ERR_JRET
(
schAsyncSendMsg
(
pJob
->
transport
,
&
epSet
,
pJob
->
queryId
,
pTask
->
taskId
,
msgType
,
msg
,
msgSize
));
SSchTrans
trans
=
{.
transInst
=
pJob
->
transport
,
.
transHandle
=
pTask
->
handle
};
SCH_ERR_JRET
(
schAsyncSendMsg
(
&
trans
,
&
epSet
,
pJob
->
queryId
,
pTask
->
taskId
,
msgType
,
msg
,
msgSize
));
if
(
isCandidateAddr
)
{
SCH_ERR_RET
(
schRecordTaskExecNode
(
pJob
,
pTask
,
addr
));
...
...
source/libs/transport/src/transCli.c
浏览文件 @
878401c1
...
...
@@ -133,15 +133,18 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
if
(
rpcMsg
.
msgType
==
TDMT_VND_QUERY_RSP
||
rpcMsg
.
msgType
==
TDMT_VND_FETCH_RSP
)
{
if
(
rpcMsg
.
msgType
==
TDMT_VND_QUERY_RSP
||
rpcMsg
.
msgType
==
TDMT_VND_FETCH_RSP
||
rpcMsg
.
msgType
==
TDMT_VND_RES_READY
)
{
rpcMsg
.
handle
=
conn
;
conn
->
persist
=
1
;
tDebug
(
"client conn %p persist by app"
,
conn
);
}
tDebug
(
"client conn %p %s received from %s:%d, local info: %s:%d"
,
conn
,
TMSG_INFO
(
pHead
->
msgType
),
inet_ntoa
(
conn
->
addr
.
sin_addr
),
ntohs
(
conn
->
addr
.
sin_port
),
inet_ntoa
(
conn
->
locaddr
.
sin_addr
),
ntohs
(
conn
->
locaddr
.
sin_port
));
conn
->
secured
=
pHead
->
secured
;
if
(
conn
->
push
!=
NULL
&&
conn
->
ctnRdCnt
!=
0
)
{
(
*
conn
->
push
->
callback
)(
conn
->
push
->
arg
,
&
rpcMsg
);
conn
->
push
=
NULL
;
...
...
@@ -156,7 +159,6 @@ static void clientHandleResp(SCliConn* conn) {
}
}
conn
->
ctnRdCnt
+=
1
;
conn
->
secured
=
pHead
->
secured
;
// buf's mem alread translated to rpcMsg.pCont
transClearBuffer
(
&
conn
->
readBuf
);
...
...
@@ -166,16 +168,14 @@ static void clientHandleResp(SCliConn* conn) {
SCliThrdObj
*
pThrd
=
conn
->
hostThrd
;
// user owns conn->persist = 1
if
(
conn
->
push
==
NULL
||
conn
->
persist
==
0
)
{
if
(
conn
->
push
==
NULL
&&
conn
->
persist
==
0
)
{
addConnToPool
(
pThrd
->
pool
,
pCtx
->
ip
,
pCtx
->
port
,
conn
);
}
destroyCmsg
(
conn
->
data
);
conn
->
data
=
NULL
;
}
// start thread's timer of conn pool if not active
if
(
!
uv_is_active
((
uv_handle_t
*
)
pThrd
->
timer
)
&&
pRpc
->
idleTime
>
0
)
{
uv_timer_start
((
uv_timer_t
*
)
pThrd
->
timer
,
clientTimeoutCb
,
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
/
2
,
0
);
//
uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
}
static
void
clientHandleExcept
(
SCliConn
*
pConn
)
{
...
...
@@ -330,6 +330,9 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b
}
static
void
clientReadCb
(
uv_stream_t
*
handle
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
// impl later
if
(
handle
->
data
==
NULL
)
{
return
;
}
SCliConn
*
conn
=
handle
->
data
;
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
if
(
nread
>
0
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录