Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e4de1da8
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e4de1da8
编写于
7月 02, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add task queue
上级
09eceed6
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
103 addition
and
61 deletion
+103
-61
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+16
-12
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+18
-6
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+36
-6
source/client/src/clientMain.c
source/client/src/clientMain.c
+33
-37
未找到文件。
source/client/inc/clientInt.h
浏览文件 @
e4de1da8
...
...
@@ -65,7 +65,7 @@ enum {
typedef
struct
SAppInstInfo
SAppInstInfo
;
typedef
struct
{
char
*
key
;
char
*
key
;
// statistics
int32_t
reportCnt
;
int32_t
connKeyCnt
;
...
...
@@ -177,14 +177,14 @@ typedef struct SReqResultInfo {
}
SReqResultInfo
;
typedef
struct
SRequestSendRecvBody
{
tsem_t
rspSem
;
// not used now
__taos_async_fn_t
queryFp
;
__taos_async_fn_t
fetchFp
;
void
*
param
;
SDataBuf
requestMsg
;
int64_t
queryJob
;
// query job, created according to sql query DAG.
int32_t
subplanNum
;
SReqResultInfo
resInfo
;
tsem_t
rspSem
;
// not used now
__taos_async_fn_t
queryFp
;
__taos_async_fn_t
fetchFp
;
void
*
param
;
SDataBuf
requestMsg
;
int64_t
queryJob
;
// query job, created according to sql query DAG.
int32_t
subplanNum
;
SReqResultInfo
resInfo
;
}
SRequestSendRecvBody
;
typedef
struct
{
...
...
@@ -284,6 +284,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
extern
SAppInfo
appInfo
;
extern
int32_t
clientReqRefPool
;
extern
int32_t
clientConnRefPool
;
extern
void
*
tscQhandle
;
__async_send_cb_fn_t
getMsgRspHandle
(
int32_t
msgType
);
...
...
@@ -301,7 +302,7 @@ void destroyRequest(SRequestObj* pRequest);
SRequestObj
*
acquireRequest
(
int64_t
rid
);
int32_t
releaseRequest
(
int64_t
rid
);
int32_t
removeRequest
(
int64_t
rid
);
void
doDestroyRequest
(
void
*
p
);
void
doDestroyRequest
(
void
*
p
);
char
*
getDbOfConnection
(
STscObj
*
pObj
);
void
setConnectionDB
(
STscObj
*
pTscObj
,
const
char
*
db
);
...
...
@@ -336,8 +337,8 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr
*
appHbMgrInit
(
SAppInstInfo
*
pAppInstInfo
,
char
*
key
);
void
appHbMgrCleanup
(
void
);
void
hbRemoveAppHbMrg
(
SAppHbMgr
**
pAppHbMgr
);
void
closeAllRequests
(
SHashObj
*
pRequests
);
void
hbRemoveAppHbMrg
(
SAppHbMgr
**
pAppHbMgr
);
void
closeAllRequests
(
SHashObj
*
pRequests
);
// conn level
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int64_t
tscRefId
,
int64_t
clusterId
,
int8_t
connType
);
...
...
@@ -356,6 +357,9 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie
int32_t
handleAlterTbExecRes
(
void
*
res
,
struct
SCatalog
*
pCatalog
);
// todo move to xxx
bool
qnodeRequired
(
SRequestObj
*
pRequest
);
void
initTscQhandle
();
void
cleanupTscQhandle
();
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientEnv.c
浏览文件 @
e4de1da8
...
...
@@ -25,6 +25,7 @@
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#define TSC_VAR_NOT_RELEASE 1
...
...
@@ -34,9 +35,20 @@ SAppInfo appInfo;
int32_t
clientReqRefPool
=
-
1
;
int32_t
clientConnRefPool
=
-
1
;
void
*
tscQhandle
=
NULL
;
static
TdThreadOnce
tscinit
=
PTHREAD_ONCE_INIT
;
volatile
int32_t
tscInitRes
=
0
;
void
initTscQhandle
()
{
// init handle
tscQhandle
=
taosInitScheduler
(
4096
,
5
,
"tsc"
);
}
void
cleanupTscQhandle
()
{
// destroy handle
taosCleanUpScheduler
(
tscQhandle
);
}
static
int32_t
registerRequest
(
SRequestObj
*
pRequest
)
{
STscObj
*
pTscObj
=
acquireTscObj
(
pRequest
->
pTscObj
->
id
);
if
(
NULL
==
pTscObj
)
{
...
...
@@ -156,9 +168,9 @@ void destroyTscObj(void *pObj) {
if
(
NULL
==
pObj
)
{
return
;
}
STscObj
*
pTscObj
=
pObj
;
int64_t
tscId
=
pTscObj
->
id
;
int64_t
tscId
=
pTscObj
->
id
;
tscDebug
(
"begin to destroy tscObj %"
PRIx64
" p:%p"
,
tscId
,
pTscObj
);
SClientHbKey
connKey
=
{.
tscRid
=
pTscObj
->
id
,
.
connType
=
pTscObj
->
connType
};
...
...
@@ -272,11 +284,11 @@ void doDestroyRequest(void *p) {
if
(
NULL
==
p
)
{
return
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
p
;
int64_t
reqId
=
pRequest
->
self
;
int64_t
reqId
=
pRequest
->
self
;
tscDebug
(
"begin to destroy request %"
PRIx64
" p:%p"
,
reqId
,
pRequest
);
taosHashRemove
(
pRequest
->
pTscObj
->
pRequests
,
&
pRequest
->
self
,
sizeof
(
pRequest
->
self
));
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
...
...
@@ -314,7 +326,7 @@ void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit
(
taos_cleanup
);
initTscQhandle
();
errno
=
TSDB_CODE_SUCCESS
;
taosSeedRand
(
taosGetTimestampSec
());
...
...
source/client/src/clientImpl.c
浏览文件 @
e4de1da8
...
...
@@ -25,6 +25,7 @@
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
#include "tsched.h"
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
...
...
@@ -56,14 +57,14 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
bool
chkRequestKilled
(
void
*
param
)
{
bool
killed
=
false
;
bool
killed
=
false
;
SRequestObj
*
pRequest
=
acquireRequest
((
int64_t
)
param
);
if
(
NULL
==
pRequest
||
pRequest
->
killed
)
{
killed
=
true
;
}
releaseRequest
((
int64_t
)
param
);
return
killed
;
}
...
...
@@ -769,7 +770,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
code
=
handleSubmitExecRes
(
pRequest
,
pRes
->
res
,
pCatalog
,
&
epset
);
break
;
}
case
TDMT_SCH_QUERY
:
case
TDMT_SCH_QUERY
:
case
TDMT_SCH_MERGE_QUERY
:
{
code
=
handleQueryExecRes
(
pRequest
,
pRes
->
res
,
pCatalog
,
&
epset
);
break
;
...
...
@@ -1236,7 +1237,16 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
}
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
typedef
struct
SchedArg
{
SRpcMsg
msg
;
SEpSet
*
pEpset
;
}
SchedArg
;
void
doProcessMsgFromServer
(
SSchedMsg
*
schedMsg
)
{
SchedArg
*
arg
=
(
SchedArg
*
)
schedMsg
->
ahandle
;
SRpcMsg
*
pMsg
=
&
arg
->
msg
;
SEpSet
*
pEpSet
=
arg
->
pEpset
;
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
info
.
ahandle
;
assert
(
pMsg
->
info
.
ahandle
!=
NULL
);
STscObj
*
pTscObj
=
NULL
;
...
...
@@ -1269,7 +1279,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
updateTargetEpSet
(
pSendInfo
,
pTscObj
,
pMsg
,
pEpSet
);
SDataBuf
buf
=
{.
msgType
=
pMsg
->
msgType
,
.
len
=
pMsg
->
contLen
,
.
pData
=
NULL
,
.
handle
=
pMsg
->
info
.
handle
,
.
pEpSet
=
pEpSet
};
SDataBuf
buf
=
{
.
msgType
=
pMsg
->
msgType
,
.
len
=
pMsg
->
contLen
,
.
pData
=
NULL
,
.
handle
=
pMsg
->
info
.
handle
,
.
pEpSet
=
pEpSet
};
if
(
pMsg
->
contLen
>
0
)
{
buf
.
pData
=
taosMemoryCalloc
(
1
,
pMsg
->
contLen
);
...
...
@@ -1284,6 +1295,25 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
pSendInfo
->
fp
(
pSendInfo
->
param
,
&
buf
,
pMsg
->
code
);
rpcFreeCont
(
pMsg
->
pCont
);
destroySendMsgInfo
(
pSendInfo
);
taosMemoryFree
(
arg
);
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SSchedMsg
schedMsg
=
{
0
};
SEpSet
*
tEpSet
=
pEpSet
!=
NULL
?
taosMemoryCalloc
(
1
,
sizeof
(
SEpSet
))
:
NULL
;
if
(
tEpSet
!=
NULL
)
{
*
tEpSet
=
*
pEpSet
;
}
SchedArg
*
arg
=
taosMemoryCalloc
(
1
,
sizeof
(
SchedArg
));
arg
->
msg
=
*
pMsg
;
arg
->
pEpset
=
tEpSet
;
schedMsg
.
fp
=
doProcessMsgFromServer
;
schedMsg
.
ahandle
=
arg
;
taosScheduleTask
(
tscQhandle
,
&
schedMsg
);
}
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
...
...
@@ -1412,7 +1442,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
pParam
=
taosMemoryCalloc
(
1
,
sizeof
(
SSyncQueryParam
));
tsem_init
(
&
pParam
->
sem
,
0
,
0
);
}
// convert ucs4 to native multi-bytes string
pResultInfo
->
convertUcs4
=
convertUcs4
;
...
...
source/client/src/clientMain.c
浏览文件 @
e4de1da8
...
...
@@ -47,11 +47,9 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
atomic_store_32
(
&
lock
,
0
);
return
ret
;
}
// this function may be called by user or system, or by both simultaneously.
void
taos_cleanup
(
void
)
{
tscInfo
(
"start to cleanup client environment"
);
tscInfo
(
"start to cleanup client environment"
);
if
(
atomic_val_compare_exchange_32
(
&
sentinel
,
TSC_VAR_NOT_RELEASE
,
TSC_VAR_RELEASED
)
!=
TSC_VAR_NOT_RELEASE
)
{
return
;
}
...
...
@@ -74,8 +72,8 @@ void taos_cleanup(void) {
catalogDestroy
();
schedulerDestroy
();
cleanupTscQhandle
();
rpcCleanup
();
tscInfo
(
"all local resources released"
);
taosCleanupCfg
();
taosCloseLog
();
...
...
@@ -108,7 +106,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
if
(
pObj
)
{
int64_t
*
rid
=
taosMemoryCalloc
(
1
,
sizeof
(
int64_t
));
*
rid
=
pObj
->
id
;
return
(
TAOS
*
)
rid
;
return
(
TAOS
*
)
rid
;
}
return
NULL
;
...
...
@@ -196,9 +194,9 @@ void taos_kill_query(TAOS *taos) {
if
(
NULL
==
taos
)
{
return
;
}
int64_t
rid
=
*
(
int64_t
*
)
taos
;
STscObj
*
pTscObj
=
acquireTscObj
(
rid
);
int64_t
rid
=
*
(
int64_t
*
)
taos
;
STscObj
*
pTscObj
=
acquireTscObj
(
rid
);
closeAllRequests
(
pTscObj
->
pRequests
);
releaseTscObj
(
rid
);
}
...
...
@@ -244,7 +242,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
#endif
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
SReqResultInfo
*
pResultInfo
;
if
(
msg
->
resIter
==
-
1
)
{
pResultInfo
=
tmqGetNextResInfo
(
res
,
true
);
...
...
@@ -420,7 +418,7 @@ int taos_affected_rows(TAOS_RES *res) {
return
0
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
return
pResInfo
->
numOfRows
;
}
...
...
@@ -606,7 +604,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
}
SReqResultInfo
*
pResInfo
=
tscGetCurResInfo
(
res
);
TAOS_FIELD
*
pField
=
&
pResInfo
->
userFields
[
columnIndex
];
TAOS_FIELD
*
pField
=
&
pResInfo
->
userFields
[
columnIndex
];
if
(
!
IS_VAR_DATA_TYPE
(
pField
->
type
))
{
return
0
;
}
...
...
@@ -650,8 +648,8 @@ const char *taos_get_server_info(TAOS *taos) {
typedef
struct
SqlParseWrapper
{
SParseContext
*
pCtx
;
SCatalogReq
catalogReq
;
SRequestObj
*
pRequest
;
SQuery
*
pQuery
;
SRequestObj
*
pRequest
;
SQuery
*
pQuery
;
}
SqlParseWrapper
;
static
void
destorySqlParseWrapper
(
SqlParseWrapper
*
pWrapper
)
{
...
...
@@ -672,8 +670,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
tscDebug
(
"enter meta callback, code %s"
,
tstrerror
(
code
));
SqlParseWrapper
*
pWrapper
=
(
SqlParseWrapper
*
)
param
;
SQuery
*
pQuery
=
pWrapper
->
pQuery
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
SQuery
*
pQuery
=
pWrapper
->
pQuery
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
code
=
qAnalyseSqlSemantic
(
pWrapper
->
pCtx
,
&
pWrapper
->
catalogReq
,
pResultMeta
,
pQuery
);
...
...
@@ -722,31 +720,29 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
return
TSDB_CODE_OUT_OF_MEMORY
;
}
**
pCxt
=
(
SParseContext
){
.
requestId
=
pRequest
->
requestId
,
.
requestRid
=
pRequest
->
self
,
.
acctId
=
pTscObj
->
acctId
,
.
db
=
pRequest
->
pDb
,
.
topicQuery
=
false
,
.
pSql
=
pRequest
->
sqlstr
,
.
sqlLen
=
pRequest
->
sqlLen
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pTransporter
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
pStmtCb
=
NULL
,
.
pUser
=
pTscObj
->
user
,
.
schemalessType
=
pTscObj
->
schemalessType
,
.
isSuperUser
=
(
0
==
strcmp
(
pTscObj
->
user
,
TSDB_DEFAULT_USER
)),
.
async
=
true
,
.
svrVer
=
pTscObj
->
sVer
,
.
nodeOffline
=
(
pTscObj
->
pAppInfo
->
onlineDnodes
<
pTscObj
->
pAppInfo
->
totalDnodes
)
};
**
pCxt
=
(
SParseContext
){.
requestId
=
pRequest
->
requestId
,
.
requestRid
=
pRequest
->
self
,
.
acctId
=
pTscObj
->
acctId
,
.
db
=
pRequest
->
pDb
,
.
topicQuery
=
false
,
.
pSql
=
pRequest
->
sqlstr
,
.
sqlLen
=
pRequest
->
sqlLen
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pTransporter
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
pStmtCb
=
NULL
,
.
pUser
=
pTscObj
->
user
,
.
schemalessType
=
pTscObj
->
schemalessType
,
.
isSuperUser
=
(
0
==
strcmp
(
pTscObj
->
user
,
TSDB_DEFAULT_USER
)),
.
async
=
true
,
.
svrVer
=
pTscObj
->
sVer
,
.
nodeOffline
=
(
pTscObj
->
pAppInfo
->
onlineDnodes
<
pTscObj
->
pAppInfo
->
totalDnodes
)};
return
TSDB_CODE_SUCCESS
;
}
void
doAsyncQuery
(
SRequestObj
*
pRequest
,
bool
updateMetaForce
)
{
SParseContext
*
pCxt
=
NULL
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
int32_t
code
=
0
;
if
(
pRequest
->
retry
++
>
REQUEST_TOTAL_EXEC_TIMES
)
{
...
...
@@ -911,10 +907,10 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return
terrno
;
}
int64_t
rid
=
*
(
int64_t
*
)
taos
;
int64_t
rid
=
*
(
int64_t
*
)
taos
;
const
int32_t
MAX_TABLE_NAME_LENGTH
=
12
*
1024
*
1024
;
// 12MB list
int32_t
code
=
0
;
SRequestObj
*
pRequest
=
NULL
;
SRequestObj
*
pRequest
=
NULL
;
SCatalogReq
catalogReq
=
{
0
};
if
(
NULL
==
tableNameList
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录