Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7ae8787c
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7ae8787c
编写于
7月 04, 2022
作者:
dengyihao
提交者:
GitHub
7月 04, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14511 from taosdata/enh/killQuery
enh: enhance kill query processing
上级
cce9f262
f213212f
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
540 addition
and
215 deletion
+540
-215
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+1
-1
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+9
-6
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+17
-12
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+57
-10
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+39
-16
source/client/src/clientMain.c
source/client/src/clientMain.c
+35
-41
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+5
-18
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+5
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+15
-8
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+5
-5
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+12
-9
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+1
-1
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+12
-6
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+30
-17
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+21
-19
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+19
-19
tests/script/api/stopquery.c
tests/script/api/stopquery.c
+257
-26
未找到文件。
include/libs/scheduler/scheduler.h
浏览文件 @
7ae8787c
...
...
@@ -130,7 +130,7 @@ void schedulerStopQueryHb(void *pTrans);
* Free the query job
* @param pJob
*/
void
schedulerFreeJob
(
int64_t
job
,
int32_t
errCode
);
void
schedulerFreeJob
(
int64_t
*
job
,
int32_t
errCode
);
void
schedulerDestroy
(
void
);
...
...
include/libs/transport/trpc.h
浏览文件 @
7ae8787c
...
...
@@ -110,12 +110,15 @@ typedef struct {
}
SRpcCtx
;
int32_t
rpcInit
();
void
rpcCleanup
();
void
*
rpcOpen
(
const
SRpcInit
*
pRpc
);
void
rpcClose
(
void
*
);
void
*
rpcMallocCont
(
int32_t
contLen
);
void
rpcFreeCont
(
void
*
pCont
);
void
*
rpcReallocCont
(
void
*
ptr
,
int32_t
contLen
);
void
rpcCleanup
();
void
*
rpcOpen
(
const
SRpcInit
*
pRpc
);
void
rpcClose
(
void
*
);
void
rpcCloseImpl
(
void
*
);
void
*
rpcMallocCont
(
int32_t
contLen
);
void
rpcFreeCont
(
void
*
pCont
);
void
*
rpcReallocCont
(
void
*
ptr
,
int32_t
contLen
);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
...
...
source/client/inc/clientInt.h
浏览文件 @
7ae8787c
...
...
@@ -65,7 +65,7 @@ enum {
typedef
struct
SAppInstInfo
SAppInstInfo
;
typedef
struct
{
char
*
key
;
char
*
key
;
// statistics
int32_t
reportCnt
;
int32_t
connKeyCnt
;
...
...
@@ -177,14 +177,14 @@ typedef struct SReqResultInfo {
}
SReqResultInfo
;
typedef
struct
SRequestSendRecvBody
{
tsem_t
rspSem
;
// not used now
__taos_async_fn_t
queryFp
;
__taos_async_fn_t
fetchFp
;
void
*
param
;
SDataBuf
requestMsg
;
int64_t
queryJob
;
// query job, created according to sql query DAG.
int32_t
subplanNum
;
SReqResultInfo
resInfo
;
tsem_t
rspSem
;
// not used now
__taos_async_fn_t
queryFp
;
__taos_async_fn_t
fetchFp
;
void
*
param
;
SDataBuf
requestMsg
;
int64_t
queryJob
;
// query job, created according to sql query DAG.
int32_t
subplanNum
;
SReqResultInfo
resInfo
;
}
SRequestSendRecvBody
;
typedef
struct
{
...
...
@@ -284,6 +284,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
extern
SAppInfo
appInfo
;
extern
int32_t
clientReqRefPool
;
extern
int32_t
clientConnRefPool
;
extern
void
*
tscQhandle
;
__async_send_cb_fn_t
getMsgRspHandle
(
int32_t
msgType
);
...
...
@@ -301,7 +302,7 @@ void destroyRequest(SRequestObj* pRequest);
SRequestObj
*
acquireRequest
(
int64_t
rid
);
int32_t
releaseRequest
(
int64_t
rid
);
int32_t
removeRequest
(
int64_t
rid
);
void
doDestroyRequest
(
void
*
p
);
void
doDestroyRequest
(
void
*
p
);
char
*
getDbOfConnection
(
STscObj
*
pObj
);
void
setConnectionDB
(
STscObj
*
pTscObj
,
const
char
*
db
);
...
...
@@ -336,8 +337,9 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr
*
appHbMgrInit
(
SAppInstInfo
*
pAppInstInfo
,
char
*
key
);
void
appHbMgrCleanup
(
void
);
void
hbRemoveAppHbMrg
(
SAppHbMgr
**
pAppHbMgr
);
void
closeAllRequests
(
SHashObj
*
pRequests
);
void
hbRemoveAppHbMrg
(
SAppHbMgr
**
pAppHbMgr
);
void
destroyAllRequests
(
SHashObj
*
pRequests
);
void
stopAllRequests
(
SHashObj
*
pRequests
);
// conn level
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int64_t
tscRefId
,
int64_t
clusterId
,
int8_t
connType
);
...
...
@@ -356,6 +358,9 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie
int32_t
handleAlterTbExecRes
(
void
*
res
,
struct
SCatalog
*
pCatalog
);
// todo move to xxx
bool
qnodeRequired
(
SRequestObj
*
pRequest
);
void
initTscQhandle
();
void
cleanupTscQhandle
();
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientEnv.c
浏览文件 @
7ae8787c
...
...
@@ -25,6 +25,7 @@
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#define TSC_VAR_NOT_RELEASE 1
...
...
@@ -34,9 +35,20 @@ SAppInfo appInfo;
int32_t
clientReqRefPool
=
-
1
;
int32_t
clientConnRefPool
=
-
1
;
void
*
tscQhandle
=
NULL
;
static
TdThreadOnce
tscinit
=
PTHREAD_ONCE_INIT
;
volatile
int32_t
tscInitRes
=
0
;
void
initTscQhandle
()
{
// init handle
tscQhandle
=
taosInitScheduler
(
4096
,
5
,
"tsc"
);
}
void
cleanupTscQhandle
()
{
// destroy handle
taosCleanUpScheduler
(
tscQhandle
);
}
static
int32_t
registerRequest
(
SRequestObj
*
pRequest
)
{
STscObj
*
pTscObj
=
acquireTscObj
(
pRequest
->
pTscObj
->
id
);
if
(
NULL
==
pTscObj
)
{
...
...
@@ -121,12 +133,31 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
return
pDnodeConn
;
}
void
closeAllRequests
(
SHashObj
*
pRequests
)
{
void
destroyAllRequests
(
SHashObj
*
pRequests
)
{
void
*
pIter
=
taosHashIterate
(
pRequests
,
NULL
);
while
(
pIter
!=
NULL
)
{
int64_t
*
rid
=
pIter
;
SRequestObj
*
pRequest
=
acquireRequest
(
*
rid
);
if
(
pRequest
)
{
destroyRequest
(
pRequest
);
releaseRequest
(
*
rid
);
}
pIter
=
taosHashIterate
(
pRequests
,
pIter
);
}
}
void
stopAllRequests
(
SHashObj
*
pRequests
)
{
void
*
pIter
=
taosHashIterate
(
pRequests
,
NULL
);
while
(
pIter
!=
NULL
)
{
int64_t
*
rid
=
pIter
;
removeRequest
(
*
rid
);
SRequestObj
*
pRequest
=
acquireRequest
(
*
rid
);
if
(
pRequest
)
{
taos_stop_query
(
pRequest
);
releaseRequest
(
*
rid
);
}
pIter
=
taosHashIterate
(
pRequests
,
pIter
);
}
...
...
@@ -153,12 +184,18 @@ void destroyAppInst(SAppInstInfo *pAppInfo) {
}
void
destroyTscObj
(
void
*
pObj
)
{
if
(
NULL
==
pObj
)
{
return
;
}
STscObj
*
pTscObj
=
pObj
;
int64_t
tscId
=
pTscObj
->
id
;
tscTrace
(
"begin to destroy tscObj %"
PRIx64
" p:%p"
,
tscId
,
pTscObj
);
SClientHbKey
connKey
=
{.
tscRid
=
pTscObj
->
id
,
.
connType
=
pTscObj
->
connType
};
hbDeregisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connKey
);
int64_t
connNum
=
atomic_sub_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
close
AllRequests
(
pTscObj
->
pRequests
);
destroy
AllRequests
(
pTscObj
->
pRequests
);
schedulerStopQueryHb
(
pTscObj
->
pAppInfo
->
pTransporter
);
tscDebug
(
"connObj 0x%"
PRIx64
" p:%p destroyed, remain inst totalConn:%"
PRId64
,
pTscObj
->
id
,
pTscObj
,
pTscObj
->
pAppInfo
->
numOfConns
);
...
...
@@ -167,7 +204,9 @@ void destroyTscObj(void *pObj) {
destroyAppInst
(
pTscObj
->
pAppInfo
);
}
taosThreadMutexDestroy
(
&
pTscObj
->
mutex
);
taosMemoryFreeClear
(
pTscObj
);
taosMemoryFree
(
pTscObj
);
tscTrace
(
"end to destroy tscObj %"
PRIx64
" p:%p"
,
tscId
,
pTscObj
);
}
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
int32_t
connType
,
SAppInstInfo
*
pAppInfo
)
{
...
...
@@ -261,14 +300,18 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
int32_t
removeRequest
(
int64_t
rid
)
{
return
taosRemoveRef
(
clientReqRefPool
,
rid
);
}
void
doDestroyRequest
(
void
*
p
)
{
assert
(
p
!=
NULL
);
if
(
NULL
==
p
)
{
return
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
p
;
int64_t
reqId
=
pRequest
->
self
;
tscTrace
(
"begin to destroy request %"
PRIx64
" p:%p"
,
reqId
,
pRequest
);
taosHashRemove
(
pRequest
->
pTscObj
->
pRequests
,
&
pRequest
->
self
,
sizeof
(
pRequest
->
self
));
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
taosMemoryFreeClear
(
pRequest
->
msgBuf
);
taosMemoryFreeClear
(
pRequest
->
sqlstr
);
...
...
@@ -284,7 +327,9 @@ void doDestroyRequest(void *p) {
if
(
pRequest
->
self
)
{
deregisterRequest
(
pRequest
);
}
taosMemoryFreeClear
(
pRequest
);
taosMemoryFree
(
pRequest
);
tscTrace
(
"end to destroy request %"
PRIx64
" p:%p"
,
reqId
,
pRequest
);
}
void
destroyRequest
(
SRequestObj
*
pRequest
)
{
...
...
@@ -292,6 +337,8 @@ void destroyRequest(SRequestObj *pRequest) {
return
;
}
taos_stop_query
(
pRequest
);
removeRequest
(
pRequest
->
self
);
}
...
...
@@ -299,7 +346,7 @@ void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit
(
taos_cleanup
);
initTscQhandle
();
errno
=
TSDB_CODE_SUCCESS
;
taosSeedRand
(
taosGetTimestampSec
());
...
...
source/client/src/clientImpl.c
浏览文件 @
7ae8787c
...
...
@@ -25,6 +25,7 @@
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
#include "tsched.h"
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
...
...
@@ -56,14 +57,14 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
bool
chkRequestKilled
(
void
*
param
)
{
bool
killed
=
false
;
bool
killed
=
false
;
SRequestObj
*
pRequest
=
acquireRequest
((
int64_t
)
param
);
if
(
NULL
==
pRequest
||
pRequest
->
killed
)
{
killed
=
true
;
}
releaseRequest
((
int64_t
)
param
);
return
killed
;
}
...
...
@@ -645,9 +646,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
pRequest
->
body
.
resInfo
.
execRes
=
res
.
res
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
pRequest
->
code
=
code
;
terrno
=
code
;
...
...
@@ -658,9 +657,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)
{
pRequest
->
body
.
resInfo
.
numOfRows
=
res
.
numOfRows
;
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
}
pRequest
->
code
=
res
.
code
;
...
...
@@ -769,7 +766,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
code
=
handleSubmitExecRes
(
pRequest
,
pRes
->
res
,
pCatalog
,
&
epset
);
break
;
}
case
TDMT_SCH_QUERY
:
case
TDMT_SCH_QUERY
:
case
TDMT_SCH_MERGE_QUERY
:
{
code
=
handleQueryExecRes
(
pRequest
,
pRes
->
res
,
pCatalog
,
&
epset
);
break
;
...
...
@@ -792,10 +789,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)
{
pRequest
->
body
.
resInfo
.
numOfRows
=
pResult
->
numOfRows
;
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
pRequest
->
body
.
queryJob
=
0
;
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
}
taosMemoryFree
(
pResult
);
...
...
@@ -1239,7 +1233,16 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
}
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
typedef
struct
SchedArg
{
SRpcMsg
msg
;
SEpSet
*
pEpset
;
}
SchedArg
;
void
doProcessMsgFromServer
(
SSchedMsg
*
schedMsg
)
{
SchedArg
*
arg
=
(
SchedArg
*
)
schedMsg
->
ahandle
;
SRpcMsg
*
pMsg
=
&
arg
->
msg
;
SEpSet
*
pEpSet
=
arg
->
pEpset
;
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
info
.
ahandle
;
assert
(
pMsg
->
info
.
ahandle
!=
NULL
);
STscObj
*
pTscObj
=
NULL
;
...
...
@@ -1272,7 +1275,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
updateTargetEpSet
(
pSendInfo
,
pTscObj
,
pMsg
,
pEpSet
);
SDataBuf
buf
=
{.
msgType
=
pMsg
->
msgType
,
.
len
=
pMsg
->
contLen
,
.
pData
=
NULL
,
.
handle
=
pMsg
->
info
.
handle
,
.
pEpSet
=
pEpSet
};
SDataBuf
buf
=
{
.
msgType
=
pMsg
->
msgType
,
.
len
=
pMsg
->
contLen
,
.
pData
=
NULL
,
.
handle
=
pMsg
->
info
.
handle
,
.
pEpSet
=
pEpSet
};
if
(
pMsg
->
contLen
>
0
)
{
buf
.
pData
=
taosMemoryCalloc
(
1
,
pMsg
->
contLen
);
...
...
@@ -1287,6 +1291,25 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
pSendInfo
->
fp
(
pSendInfo
->
param
,
&
buf
,
pMsg
->
code
);
rpcFreeCont
(
pMsg
->
pCont
);
destroySendMsgInfo
(
pSendInfo
);
taosMemoryFree
(
arg
);
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SSchedMsg
schedMsg
=
{
0
};
SEpSet
*
tEpSet
=
pEpSet
!=
NULL
?
taosMemoryCalloc
(
1
,
sizeof
(
SEpSet
))
:
NULL
;
if
(
tEpSet
!=
NULL
)
{
*
tEpSet
=
*
pEpSet
;
}
SchedArg
*
arg
=
taosMemoryCalloc
(
1
,
sizeof
(
SchedArg
));
arg
->
msg
=
*
pMsg
;
arg
->
pEpset
=
tEpSet
;
schedMsg
.
fp
=
doProcessMsgFromServer
;
schedMsg
.
ahandle
=
arg
;
taosScheduleTask
(
tscQhandle
,
&
schedMsg
);
}
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
...
...
@@ -1415,7 +1438,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
pParam
=
taosMemoryCalloc
(
1
,
sizeof
(
SSyncQueryParam
));
tsem_init
(
&
pParam
->
sem
,
0
,
0
);
}
// convert ucs4 to native multi-bytes string
pResultInfo
->
convertUcs4
=
convertUcs4
;
...
...
source/client/src/clientMain.c
浏览文件 @
7ae8787c
...
...
@@ -47,11 +47,9 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
atomic_store_32
(
&
lock
,
0
);
return
ret
;
}
// this function may be called by user or system, or by both simultaneously.
void
taos_cleanup
(
void
)
{
tscInfo
(
"start to cleanup client environment"
);
tscInfo
(
"start to cleanup client environment"
);
if
(
atomic_val_compare_exchange_32
(
&
sentinel
,
TSC_VAR_NOT_RELEASE
,
TSC_VAR_RELEASED
)
!=
TSC_VAR_NOT_RELEASE
)
{
return
;
}
...
...
@@ -74,8 +72,8 @@ void taos_cleanup(void) {
catalogDestroy
();
schedulerDestroy
();
cleanupTscQhandle
();
rpcCleanup
();
tscInfo
(
"all local resources released"
);
taosCleanupCfg
();
taosCloseLog
();
...
...
@@ -108,7 +106,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
if
(
pObj
)
{
int64_t
*
rid
=
taosMemoryCalloc
(
1
,
sizeof
(
int64_t
));
*
rid
=
pObj
->
id
;
return
(
TAOS
*
)
rid
;
return
(
TAOS
*
)
rid
;
}
return
NULL
;
...
...
@@ -196,10 +194,10 @@ void taos_kill_query(TAOS *taos) {
if
(
NULL
==
taos
)
{
return
;
}
int64_t
rid
=
*
(
int64_t
*
)
taos
;
STscObj
*
pTscObj
=
acquireTscObj
(
rid
);
close
AllRequests
(
pTscObj
->
pRequests
);
int64_t
rid
=
*
(
int64_t
*
)
taos
;
STscObj
*
pTscObj
=
acquireTscObj
(
rid
);
stop
AllRequests
(
pTscObj
->
pRequests
);
releaseTscObj
(
rid
);
}
...
...
@@ -244,7 +242,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
#endif
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
SReqResultInfo
*
pResultInfo
;
if
(
msg
->
resIter
==
-
1
)
{
pResultInfo
=
tmqGetNextResInfo
(
res
,
true
);
...
...
@@ -420,7 +418,7 @@ int taos_affected_rows(TAOS_RES *res) {
return
0
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
return
pResInfo
->
numOfRows
;
}
...
...
@@ -480,9 +478,7 @@ void taos_stop_query(TAOS_RES *res) {
return
;
}
if
(
pRequest
->
body
.
queryJob
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
tscDebug
(
"request %"
PRIx64
" killed"
,
pRequest
->
requestId
);
}
...
...
@@ -606,7 +602,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
}
SReqResultInfo
*
pResInfo
=
tscGetCurResInfo
(
res
);
TAOS_FIELD
*
pField
=
&
pResInfo
->
userFields
[
columnIndex
];
TAOS_FIELD
*
pField
=
&
pResInfo
->
userFields
[
columnIndex
];
if
(
!
IS_VAR_DATA_TYPE
(
pField
->
type
))
{
return
0
;
}
...
...
@@ -650,8 +646,8 @@ const char *taos_get_server_info(TAOS *taos) {
typedef
struct
SqlParseWrapper
{
SParseContext
*
pCtx
;
SCatalogReq
catalogReq
;
SRequestObj
*
pRequest
;
SQuery
*
pQuery
;
SRequestObj
*
pRequest
;
SQuery
*
pQuery
;
}
SqlParseWrapper
;
static
void
destorySqlParseWrapper
(
SqlParseWrapper
*
pWrapper
)
{
...
...
@@ -672,8 +668,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
tscDebug
(
"enter meta callback, code %s"
,
tstrerror
(
code
));
SqlParseWrapper
*
pWrapper
=
(
SqlParseWrapper
*
)
param
;
SQuery
*
pQuery
=
pWrapper
->
pQuery
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
SQuery
*
pQuery
=
pWrapper
->
pQuery
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
code
=
qAnalyseSqlSemantic
(
pWrapper
->
pCtx
,
&
pWrapper
->
catalogReq
,
pResultMeta
,
pQuery
);
...
...
@@ -722,31 +718,29 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
return
TSDB_CODE_OUT_OF_MEMORY
;
}
**
pCxt
=
(
SParseContext
){
.
requestId
=
pRequest
->
requestId
,
.
requestRid
=
pRequest
->
self
,
.
acctId
=
pTscObj
->
acctId
,
.
db
=
pRequest
->
pDb
,
.
topicQuery
=
false
,
.
pSql
=
pRequest
->
sqlstr
,
.
sqlLen
=
pRequest
->
sqlLen
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pTransporter
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
pStmtCb
=
NULL
,
.
pUser
=
pTscObj
->
user
,
.
schemalessType
=
pTscObj
->
schemalessType
,
.
isSuperUser
=
(
0
==
strcmp
(
pTscObj
->
user
,
TSDB_DEFAULT_USER
)),
.
async
=
true
,
.
svrVer
=
pTscObj
->
sVer
,
.
nodeOffline
=
(
pTscObj
->
pAppInfo
->
onlineDnodes
<
pTscObj
->
pAppInfo
->
totalDnodes
)
};
**
pCxt
=
(
SParseContext
){.
requestId
=
pRequest
->
requestId
,
.
requestRid
=
pRequest
->
self
,
.
acctId
=
pTscObj
->
acctId
,
.
db
=
pRequest
->
pDb
,
.
topicQuery
=
false
,
.
pSql
=
pRequest
->
sqlstr
,
.
sqlLen
=
pRequest
->
sqlLen
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pTransporter
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
pStmtCb
=
NULL
,
.
pUser
=
pTscObj
->
user
,
.
schemalessType
=
pTscObj
->
schemalessType
,
.
isSuperUser
=
(
0
==
strcmp
(
pTscObj
->
user
,
TSDB_DEFAULT_USER
)),
.
async
=
true
,
.
svrVer
=
pTscObj
->
sVer
,
.
nodeOffline
=
(
pTscObj
->
pAppInfo
->
onlineDnodes
<
pTscObj
->
pAppInfo
->
totalDnodes
)};
return
TSDB_CODE_SUCCESS
;
}
void
doAsyncQuery
(
SRequestObj
*
pRequest
,
bool
updateMetaForce
)
{
SParseContext
*
pCxt
=
NULL
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
int32_t
code
=
0
;
if
(
pRequest
->
retry
++
>
REQUEST_TOTAL_EXEC_TIMES
)
{
...
...
@@ -911,10 +905,10 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return
terrno
;
}
int64_t
rid
=
*
(
int64_t
*
)
taos
;
int64_t
rid
=
*
(
int64_t
*
)
taos
;
const
int32_t
MAX_TABLE_NAME_LENGTH
=
12
*
1024
*
1024
;
// 12MB list
int32_t
code
=
0
;
SRequestObj
*
pRequest
=
NULL
;
SRequestObj
*
pRequest
=
NULL
;
SCatalogReq
catalogReq
=
{
0
};
if
(
NULL
==
tableNameList
)
{
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
7ae8787c
...
...
@@ -184,9 +184,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
}
if
((
*
pJob
->
chkKillFp
)(
pJob
->
chkKillParam
))
{
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_DROPPING
);
schUpdateJobErrCode
(
pJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
return
true
;
}
...
...
@@ -811,14 +809,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*/
int32_t
schTaskCheckSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
bool
*
needRetry
)
{
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry cause of job status, job status:%s"
,
jobTaskStatusStr
(
status
));
return
TSDB_CODE_SUCCESS
;
}
if
(
TSDB_CODE_SCH_TIMEOUT_ERROR
==
errCode
)
{
pTask
->
maxExecTimes
++
;
if
(
pTask
->
timeoutUsec
<
SCH_MAX_TASK_TIMEOUT_USEC
)
{
...
...
@@ -1277,7 +1267,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskStatus
*
taskStatus
=
taosArrayGet
(
pStatusList
,
i
);
qDebug
(
"QID:%"
PRIx64
",TID:0x%"
PRIx64
",EID:%d task status in server: %s"
,
qDebug
(
"QID:
0x
%"
PRIx64
",TID:0x%"
PRIx64
",EID:%d task status in server: %s"
,
taskStatus
->
queryId
,
taskStatus
->
taskId
,
taskStatus
->
execId
,
jobTaskStatusStr
(
taskStatus
->
status
));
SSchJob
*
pJob
=
schAcquireJob
(
taskStatus
->
refId
);
...
...
@@ -1495,6 +1485,8 @@ void schFreeJobImpl(void *job) {
uint64_t
queryId
=
pJob
->
queryId
;
int64_t
refId
=
pJob
->
refId
;
qDebug
(
"QID:0x%"
PRIx64
" begin to free sch job, refId:0x%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
if
(
pJob
->
status
==
JOB_TASK_STATUS_EXECUTING
)
{
schCancelJob
(
pJob
);
}
...
...
@@ -1535,12 +1527,12 @@ void schFreeJobImpl(void *job) {
taosMemoryFreeClear
(
pJob
->
resData
);
taosMemoryFree
(
pJob
);
qDebug
(
"QID:0x%"
PRIx64
" sch job freed, refId:0x%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
int32_t
jobNum
=
atomic_sub_fetch_32
(
&
schMgmt
.
jobNum
,
1
);
if
(
jobNum
==
0
)
{
schCloseJobRef
();
}
qDebug
(
"QID:0x%"
PRIx64
" sch job freed, refId:0x%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
}
int32_t
schLaunchStaticExplainJob
(
SSchedulerReq
*
pReq
,
SSchJob
*
pJob
,
bool
sync
)
{
...
...
@@ -1687,11 +1679,6 @@ _return:
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"redirect will no continue cause of job status %s"
,
jobTaskStatusStr
(
status
));
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
if
((
pTask
->
execId
+
1
)
>=
pTask
->
maxExecTimes
)
{
SCH_TASK_DLOG
(
"task no more retry since reach max try times, execId:%d"
,
pTask
->
execId
);
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
7ae8787c
...
...
@@ -402,12 +402,16 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
goto
_return
;
}
code
=
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
pMsg
->
pData
=
NULL
;
_return:
if
(
pTask
)
{
if
(
code
)
{
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
);
}
SCH_UNLOCK_TASK
(
pTask
);
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
7ae8787c
...
...
@@ -225,26 +225,33 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb
(
pTrans
);
}
void
schedulerFreeJob
(
int64_t
job
,
int32_t
errCode
)
{
SSchJob
*
pJob
=
schAcquireJob
(
job
);
void
schedulerFreeJob
(
int64_t
*
job
,
int32_t
errCode
)
{
if
(
0
==
*
job
)
{
return
;
}
SSchJob
*
pJob
=
schAcquireJob
(
*
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
qError
(
"acquire sch job failed, may be dropped, jobId:0x%"
PRIx64
,
*
job
);
*
job
=
0
;
return
;
}
int32_t
code
=
schProcessOnJobDropped
(
pJob
,
errCode
);
if
(
TSDB_CODE_SCH_JOB_IS_DROPPING
==
code
)
{
SCH_JOB_DLOG
(
"sch job is already dropping, refId:0x%"
PRIx64
,
job
);
SCH_JOB_DLOG
(
"sch job is already dropping, refId:0x%"
PRIx64
,
*
job
);
*
job
=
0
;
return
;
}
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:0x%"
PRIx64
,
job
);
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:0x%"
PRIx64
,
*
job
);
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
job
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:0x%"
PRIx64
,
job
);
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
*
job
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:0x%"
PRIx64
,
*
job
);
}
schReleaseJob
(
job
);
schReleaseJob
(
*
job
);
*
job
=
0
;
}
void
schedulerDestroy
(
void
)
{
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
7ae8787c
...
...
@@ -457,7 +457,7 @@ void schtFreeQueryJob(int32_t freeThread) {
int64_t
job
=
queryJobRefId
;
if
(
job
&&
atomic_val_compare_exchange_64
(
&
queryJobRefId
,
job
,
0
))
{
schedulerFreeJob
(
job
,
0
);
schedulerFreeJob
(
&
job
,
0
);
if
(
freeThread
)
{
if
(
++
freeNum
%
schtTestPrintNum
==
0
)
{
printf
(
"FreeNum:%d
\n
"
,
freeNum
);
...
...
@@ -724,7 +724,7 @@ TEST(queryTest, normalCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schedulerFreeJob
(
&
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -828,7 +828,7 @@ TEST(queryTest, readyFirstCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schedulerFreeJob
(
&
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -940,7 +940,7 @@ TEST(queryTest, flowCtrlCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schedulerFreeJob
(
&
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -994,7 +994,7 @@ TEST(insertTest, normalCase) {
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
res
.
numOfRows
,
20
);
schedulerFreeJob
(
insertJobRefId
,
0
);
schedulerFreeJob
(
&
insertJobRefId
,
0
);
schedulerDestroy
();
}
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
7ae8787c
...
...
@@ -253,7 +253,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq);
do { \
if (id > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(
id);
\
SExHandle* exh2 = transAcquireExHandle(
transGetRefMgt(), id);
\
if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, id); \
...
...
@@ -261,7 +261,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq);
} \
} else if (id == 0) { \
tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(
id);
\
SExHandle* exh2 = transAcquireExHandle(
transGetRefMgt(), id);
\
if (exh2 == NULL || id == exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \
exh2 ? exh2->refId : 0); \
...
...
@@ -391,13 +391,16 @@ void transThreadOnce();
void
transInit
();
void
transCleanup
();
int32_t
transOpenExHandleMgt
(
int
size
);
void
transCloseExHandleMgt
();
int64_t
transAddExHandle
(
void
*
p
);
int32_t
transRemoveExHandle
(
int64_t
refId
);
SExHandle
*
transAcquireExHandle
(
int64_t
refId
);
int32_t
transReleaseExHandle
(
int64_t
refId
);
void
transDestoryExHandle
(
void
*
handle
);
int32_t
transOpenRefMgt
(
int
size
,
void
(
*
func
)(
void
*
));
void
transCloseRefMgt
(
int32_t
refMgt
);
int64_t
transAddExHandle
(
int32_t
refMgt
,
void
*
p
);
int32_t
transRemoveExHandle
(
int32_t
refMgt
,
int64_t
refId
);
void
*
transAcquireExHandle
(
int32_t
refMgt
,
int64_t
refId
);
int32_t
transReleaseExHandle
(
int32_t
refMgt
,
int64_t
refId
);
void
transDestoryExHandle
(
void
*
handle
);
int32_t
transGetRefMgt
();
int32_t
transGetInstMgt
();
#ifdef __cplusplus
}
...
...
source/libs/transport/inc/transportInt.h
浏览文件 @
7ae8787c
...
...
@@ -57,7 +57,7 @@ typedef struct {
void
*
parent
;
void
*
tcphandle
;
// returned handle from TCP initialization
int
32_t
refMgt
;
int
64_t
refId
;
TdThreadMutex
mutex
;
}
SRpcInfo
;
...
...
source/libs/transport/src/trans.c
浏览文件 @
7ae8787c
...
...
@@ -76,16 +76,23 @@ void* rpcOpen(const SRpcInit* pInit) {
if
(
pInit
->
user
)
{
memcpy
(
pRpc
->
user
,
pInit
->
user
,
strlen
(
pInit
->
user
));
}
return
pRpc
;
int64_t
refId
=
transAddExHandle
(
transGetInstMgt
(),
pRpc
);
transAcquireExHandle
(
transGetInstMgt
(),
refId
);
pRpc
->
refId
=
refId
;
return
(
void
*
)
refId
;
}
void
rpcClose
(
void
*
arg
)
{
tInfo
(
"start to close rpc"
);
transRemoveExHandle
(
transGetInstMgt
(),
(
int64_t
)
arg
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
arg
);
tInfo
(
"finish to close rpc"
);
return
;
}
void
rpcCloseImpl
(
void
*
arg
)
{
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
arg
;
(
*
taosCloseHandle
[
pRpc
->
connType
])(
pRpc
->
tcphandle
);
taosMemoryFree
(
pRpc
);
tInfo
(
"finish to close rpc"
);
return
;
}
void
*
rpcMallocCont
(
int32_t
contLen
)
{
...
...
@@ -140,11 +147,10 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
}
void
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
transSendResponse
(
pMsg
);
}
void
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
transSendResponse
(
pMsg
);
}
int32_t
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
return
0
;
}
void
rpcRefHandle
(
void
*
handle
,
int8_t
type
)
{
assert
(
type
==
TAOS_CONN_SERVER
||
type
==
TAOS_CONN_CLIENT
);
(
*
taosRefHandle
[
type
])(
handle
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
7ae8787c
...
...
@@ -47,6 +47,7 @@ typedef struct SCliMsg {
queue
q
;
STransMsgType
type
;
int64_t
refId
;
uint64_t
st
;
int
sent
;
//(0: no send, 1: alread sent)
}
SCliMsg
;
...
...
@@ -505,13 +506,13 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
}
static
void
allocConnRef
(
SCliConn
*
conn
,
bool
update
)
{
if
(
update
)
{
transRemoveExHandle
(
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
conn
->
refId
=
-
1
;
}
SExHandle
*
exh
=
taosMemoryCalloc
(
1
,
sizeof
(
SExHandle
));
exh
->
handle
=
conn
;
exh
->
pThrd
=
conn
->
hostThrd
;
exh
->
refId
=
transAddExHandle
(
exh
);
exh
->
refId
=
transAddExHandle
(
transGetRefMgt
(),
exh
);
conn
->
refId
=
exh
->
refId
;
}
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
)
{
...
...
@@ -604,16 +605,14 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace
(
"%s conn %p remove from conn pool"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
QUEUE_REMOVE
(
&
conn
->
conn
);
QUEUE_INIT
(
&
conn
->
conn
);
transRemoveExHandle
(
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
conn
->
refId
=
-
1
;
if
(
clear
)
{
if
(
!
uv_is_closing
((
uv_handle_t
*
)
conn
->
stream
))
{
uv_read_stop
(
conn
->
stream
);
uv_close
((
uv_handle_t
*
)
conn
->
stream
,
cliDestroy
);
}
//} else {
// cliDestroy((uv_handle_t*)conn->stream);
//}
}
}
static
void
cliDestroy
(
uv_handle_t
*
handle
)
{
...
...
@@ -622,7 +621,7 @@ static void cliDestroy(uv_handle_t* handle) {
}
SCliConn
*
conn
=
handle
->
data
;
transRemoveExHandle
(
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
taosMemoryFree
(
conn
->
ip
);
conn
->
stream
->
data
=
NULL
;
taosMemoryFree
(
conn
->
stream
);
...
...
@@ -638,7 +637,6 @@ static bool cliHandleNoResp(SCliConn* conn) {
SCliMsg
*
pMsg
=
transQueueGet
(
&
conn
->
cliMsgs
,
0
);
if
(
REQUEST_NO_RESP
(
&
pMsg
->
msg
))
{
transQueuePop
(
&
conn
->
cliMsgs
);
// taosArrayRemove(msgs, 0);
destroyCmsg
(
pMsg
);
res
=
true
;
}
...
...
@@ -749,7 +747,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
}
static
void
cliHandleRelease
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
int64_t
refId
=
(
int64_t
)(
pMsg
->
msg
.
info
.
handle
);
SExHandle
*
exh
=
transAcquireExHandle
(
refId
);
SExHandle
*
exh
=
transAcquireExHandle
(
transGetRefMgt
(),
refId
);
if
(
exh
==
NULL
)
{
tDebug
(
"%"
PRId64
" already release"
,
refId
);
}
...
...
@@ -775,14 +773,14 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
SCliConn
*
conn
=
NULL
;
int64_t
refId
=
(
int64_t
)(
pMsg
->
msg
.
info
.
handle
);
if
(
refId
!=
0
)
{
SExHandle
*
exh
=
transAcquireExHandle
(
refId
);
SExHandle
*
exh
=
transAcquireExHandle
(
transGetRefMgt
(),
refId
);
if
(
exh
==
NULL
)
{
*
ignore
=
true
;
destroyCmsg
(
pMsg
);
return
NULL
;
}
else
{
conn
=
exh
->
handle
;
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
}
return
conn
;
};
...
...
@@ -984,6 +982,7 @@ void cliSendQuit(SCliThrd* thrd) {
}
void
cliWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
if
(
!
uv_is_closing
(
handle
))
{
uv_read_stop
((
uv_stream_t
*
)
handle
);
uv_close
(
handle
,
cliDestroy
);
}
}
...
...
@@ -1073,7 +1072,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if
(
retry
)
{
pMsg
->
sent
=
0
;
pCtx
->
retryCnt
+=
1
;
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
code
==
TSDB_CODE_RPC_BROKEN_LINK
)
{
cliCompareAndSwap
(
&
pCtx
->
retryLimit
,
TRANS_RETRY_COUNT_LIMIT
,
EPSET_GET_SIZE
(
&
pCtx
->
epSet
)
*
3
);
if
(
pCtx
->
retryCnt
<
pCtx
->
retryLimit
)
{
transUnrefCliHandle
(
pConn
);
...
...
@@ -1161,12 +1160,12 @@ void transUnrefCliHandle(void* handle) {
}
SCliThrd
*
transGetWorkThrdFromHandle
(
int64_t
handle
)
{
SCliThrd
*
pThrd
=
NULL
;
SExHandle
*
exh
=
transAcquireExHandle
(
handle
);
SExHandle
*
exh
=
transAcquireExHandle
(
transGetRefMgt
(),
handle
);
if
(
exh
==
NULL
)
{
return
NULL
;
}
pThrd
=
exh
->
pThrd
;
transReleaseExHandle
(
handle
);
transReleaseExHandle
(
transGetRefMgt
(),
handle
);
return
pThrd
;
}
SCliThrd
*
transGetWorkThrd
(
STrans
*
trans
,
int64_t
handle
)
{
...
...
@@ -1193,10 +1192,13 @@ void transReleaseCliHandle(void* handle) {
}
void
transSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
STransMsg
*
pReq
,
STransCtx
*
ctx
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
STrans
*
pTransInst
=
(
STrans
*
)
transAcquireExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
if
(
pTransInst
==
NULL
)
return
;
SCliThrd
*
pThrd
=
transGetWorkThrd
(
pTransInst
,
(
int64_t
)
pReq
->
info
.
handle
);
if
(
pThrd
==
NULL
)
{
transFreeMsg
(
pReq
->
pCont
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
return
;
}
...
...
@@ -1217,19 +1219,24 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
cliMsg
->
msg
=
*
pReq
;
cliMsg
->
st
=
taosGetTimestampUs
();
cliMsg
->
type
=
Normal
;
cliMsg
->
refId
=
(
int64_t
)
shandle
;
STraceId
*
trace
=
&
pReq
->
info
.
traceId
;
tGTrace
(
"%s send request at thread:%08"
PRId64
", dst: %s:%d, app:%p"
,
transLabel
(
pTransInst
),
pThrd
->
pid
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
),
pReq
->
info
.
ahandle
);
ASSERT
(
transAsyncSend
(
pThrd
->
asyncPool
,
&
(
cliMsg
->
q
))
==
0
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
return
;
}
void
transSendRecv
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
STransMsg
*
pReq
,
STransMsg
*
pRsp
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
STrans
*
pTransInst
=
(
STrans
*
)
transAcquireExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
if
(
pTransInst
==
NULL
)
return
;
SCliThrd
*
pThrd
=
transGetWorkThrd
(
pTransInst
,
(
int64_t
)
pReq
->
info
.
handle
);
if
(
pThrd
==
NULL
)
{
transFreeMsg
(
pReq
->
pCont
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
return
;
}
tsem_t
*
sem
=
taosMemoryCalloc
(
1
,
sizeof
(
tsem_t
));
...
...
@@ -1250,6 +1257,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg
->
msg
=
*
pReq
;
cliMsg
->
st
=
taosGetTimestampUs
();
cliMsg
->
type
=
Normal
;
cliMsg
->
refId
=
(
int64_t
)
shandle
;
STraceId
*
trace
=
&
pReq
->
info
.
traceId
;
tGTrace
(
"%s send request at thread:%08"
PRId64
", dst: %s:%d, app:%p"
,
transLabel
(
pTransInst
),
pThrd
->
pid
,
...
...
@@ -1259,13 +1267,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
tsem_wait
(
sem
);
tsem_destroy
(
sem
);
taosMemoryFree
(
sem
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
return
;
}
/*
*
**/
void
transSetDefaultAddr
(
void
*
shandle
,
const
char
*
ip
,
const
char
*
fqdn
)
{
STrans
*
pTransInst
=
shandle
;
STrans
*
pTransInst
=
(
STrans
*
)
transAcquireExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
if
(
pTransInst
==
NULL
)
return
;
SCvtAddr
cvtAddr
=
{
0
};
if
(
ip
!=
NULL
&&
fqdn
!=
NULL
)
{
...
...
@@ -1280,11 +1291,13 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliMsg
*
cliMsg
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliMsg
));
cliMsg
->
ctx
=
pCtx
;
cliMsg
->
type
=
Update
;
cliMsg
->
refId
=
(
int64_t
)
shandle
;
SCliThrd
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
i
];
tDebug
(
"%s update epset at thread:%08"
PRId64
""
,
pTransInst
->
label
,
thrd
->
pid
);
transAsyncSend
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
));
}
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
}
#endif
source/libs/transport/src/transComm.c
浏览文件 @
7ae8787c
...
...
@@ -19,6 +19,7 @@
static
TdThreadOnce
transModuleInit
=
PTHREAD_ONCE_INIT
;
static
int32_t
refMgt
;
static
int32_t
instMgt
;
int
transAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
T_MD5_CTX
context
;
...
...
@@ -479,53 +480,54 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
}
return
true
;
}
static
int32_t
transGetRefMgt
()
{
//
return
refMgt
;
}
static
void
transInitEnv
()
{
refMgt
=
transOpenExHandleMgt
(
50000
);
refMgt
=
transOpenRefMgt
(
50000
,
transDestoryExHandle
);
instMgt
=
taosOpenRef
(
50
,
rpcCloseImpl
);
uv_os_setenv
(
"UV_TCP_SINGLE_ACCEPT"
,
"1"
);
}
static
void
transDestroyEnv
()
{
// close ref
transClose
ExHandleMgt
(
);
transCloseRefMgt
(
refMgt
);
transClose
RefMgt
(
instMgt
);
}
void
transInit
()
{
// init env
taosThreadOnce
(
&
transModuleInit
,
transInitEnv
);
}
int32_t
transGetRefMgt
()
{
return
refMgt
;
}
int32_t
transGetInstMgt
()
{
return
instMgt
;
}
void
transCleanup
()
{
// clean env
transDestroyEnv
();
}
int32_t
transOpen
ExHandleMgt
(
int
size
)
{
int32_t
transOpen
RefMgt
(
int
size
,
void
(
*
func
)(
void
*
)
)
{
// added into once later
return
taosOpenRef
(
size
,
transDestoryExHandle
);
return
taosOpenRef
(
size
,
func
);
}
void
transClose
ExHandleMgt
(
)
{
void
transClose
RefMgt
(
int32_t
mgt
)
{
// close ref
taosCloseRef
(
transGetRefMgt
()
);
taosCloseRef
(
mgt
);
}
int64_t
transAddExHandle
(
void
*
p
)
{
int64_t
transAddExHandle
(
int32_t
refMgt
,
void
*
p
)
{
// acquire extern handle
return
taosAddRef
(
transGetRefMgt
()
,
p
);
return
taosAddRef
(
refMgt
,
p
);
}
int32_t
transRemoveExHandle
(
int64_t
refId
)
{
int32_t
transRemoveExHandle
(
int
32_t
refMgt
,
int
64_t
refId
)
{
// acquire extern handle
return
taosRemoveRef
(
transGetRefMgt
()
,
refId
);
return
taosRemoveRef
(
refMgt
,
refId
);
}
SExHandle
*
transAcquireExHandle
(
int64_t
refId
)
{
void
*
transAcquireExHandle
(
int32_t
refMgt
,
int64_t
refId
)
{
// acquire extern handle
return
(
SExHandle
*
)
taosAcquireRef
(
transGetRefMgt
()
,
refId
);
return
(
void
*
)
taosAcquireRef
(
refMgt
,
refId
);
}
int32_t
transReleaseExHandle
(
int64_t
refId
)
{
int32_t
transReleaseExHandle
(
int
32_t
refMgt
,
int
64_t
refId
)
{
// release extern handle
return
taosReleaseRef
(
transGetRefMgt
()
,
refId
);
return
taosReleaseRef
(
refMgt
,
refId
);
}
void
transDestoryExHandle
(
void
*
handle
)
{
if
(
handle
==
NULL
)
{
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
7ae8787c
...
...
@@ -261,7 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg
.
info
.
ahandle
=
(
void
*
)
pHead
->
ahandle
;
transMsg
.
info
.
handle
=
(
void
*
)
transAcquireExHandle
(
pConn
->
refId
);
transMsg
.
info
.
handle
=
(
void
*
)
transAcquireExHandle
(
transGetRefMgt
(),
pConn
->
refId
);
transMsg
.
info
.
refId
=
pConn
->
refId
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
...
...
@@ -279,7 +279,7 @@ static void uvHandleReq(SSvrConn* pConn) {
pConnInfo
->
clientPort
=
ntohs
(
pConn
->
addr
.
sin_port
);
tstrncpy
(
pConnInfo
->
user
,
pConn
->
user
,
sizeof
(
pConnInfo
->
user
));
transReleaseExHandle
(
pConn
->
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
pConn
->
refId
);
STrans
*
pTransInst
=
pConn
->
pTransInst
;
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
transMsg
,
NULL
);
...
...
@@ -507,15 +507,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle
*
exh1
=
transMsg
.
info
.
handle
;
int64_t
refId
=
transMsg
.
info
.
refId
;
SExHandle
*
exh2
=
transAcquireExHandle
(
refId
);
SExHandle
*
exh2
=
transAcquireExHandle
(
transGetRefMgt
(),
refId
);
if
(
exh2
==
NULL
||
exh1
!=
exh2
)
{
tTrace
(
"handle except msg %p, ignore it"
,
exh1
);
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
destroySmsg
(
msg
);
continue
;
}
msg
->
pConn
=
exh1
->
handle
;
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
(
*
transAsyncHandle
[
msg
->
type
])(
msg
,
pThrd
);
}
}
...
...
@@ -757,8 +757,8 @@ static SSvrConn* createConn(void* hThrd) {
SExHandle
*
exh
=
taosMemoryMalloc
(
sizeof
(
SExHandle
));
exh
->
handle
=
pConn
;
exh
->
pThrd
=
pThrd
;
exh
->
refId
=
transAddExHandle
(
exh
);
transAcquireExHandle
(
exh
->
refId
);
exh
->
refId
=
transAddExHandle
(
transGetRefMgt
(),
exh
);
transAcquireExHandle
(
transGetRefMgt
(),
exh
->
refId
);
pConn
->
refId
=
exh
->
refId
;
transRefSrvHandle
(
pConn
);
...
...
@@ -789,14 +789,14 @@ static void destroyConnRegArg(SSvrConn* conn) {
}
}
static
int
reallocConnRef
(
SSvrConn
*
conn
)
{
transReleaseExHandle
(
conn
->
refId
);
transRemoveExHandle
(
conn
->
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
// avoid app continue to send msg on invalid handle
SExHandle
*
exh
=
taosMemoryMalloc
(
sizeof
(
SExHandle
));
exh
->
handle
=
conn
;
exh
->
pThrd
=
conn
->
hostThrd
;
exh
->
refId
=
transAddExHandle
(
exh
);
transAcquireExHandle
(
exh
->
refId
);
exh
->
refId
=
transAddExHandle
(
transGetRefMgt
(),
exh
);
transAcquireExHandle
(
transGetRefMgt
(),
exh
->
refId
);
conn
->
refId
=
exh
->
refId
;
return
0
;
...
...
@@ -808,8 +808,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
}
SWorkThrd
*
thrd
=
conn
->
hostThrd
;
transReleaseExHandle
(
conn
->
refId
);
transRemoveExHandle
(
conn
->
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
tDebug
(
"%s conn %p destroy"
,
transLabel
(
thrd
->
pTransInst
),
conn
);
transQueueDestroy
(
&
conn
->
srvMsgs
);
...
...
@@ -1046,11 +1046,11 @@ void transReleaseSrvHandle(void* handle) {
tTrace
(
"%s conn %p start to release"
,
transLabel
(
pThrd
->
pTransInst
),
exh
->
handle
);
transAsyncSend
(
pThrd
->
asyncPool
,
&
m
->
q
);
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
return
;
_return1:
tTrace
(
"handle %p failed to send to release handle"
,
exh
);
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
return
;
_return2:
tTrace
(
"handle %p failed to send to release handle"
,
exh
);
...
...
@@ -1075,12 +1075,12 @@ void transSendResponse(const STransMsg* msg) {
STraceId
*
trace
=
(
STraceId
*
)
&
msg
->
info
.
traceId
;
tGTrace
(
"conn %p start to send resp (1/2)"
,
exh
->
handle
);
transAsyncSend
(
pThrd
->
asyncPool
,
&
m
->
q
);
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
return
;
_return1:
tTrace
(
"handle %p failed to send resp"
,
exh
);
rpcFreeCont
(
msg
->
pCont
);
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
return
;
_return2:
tTrace
(
"handle %p failed to send resp"
,
exh
);
...
...
@@ -1104,13 +1104,13 @@ void transRegisterMsg(const STransMsg* msg) {
tTrace
(
"%s conn %p start to register brokenlink callback"
,
transLabel
(
pThrd
->
pTransInst
),
exh
->
handle
);
transAsyncSend
(
pThrd
->
asyncPool
,
&
m
->
q
);
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
return
;
_return1:
tTrace
(
"handle %p failed to register brokenlink"
,
exh
);
rpcFreeCont
(
msg
->
pCont
);
transReleaseExHandle
(
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
return
;
_return2:
tTrace
(
"handle %p failed to register brokenlink"
,
exh
);
...
...
tests/script/api/stopquery.c
浏览文件 @
7ae8787c
...
...
@@ -52,6 +52,7 @@ typedef struct {
typedef
struct
SSP_CB_PARAM
{
TAOS
*
taos
;
bool
fetch
;
bool
free
;
int32_t
*
end
;
}
SSP_CB_PARAM
;
...
...
@@ -156,7 +157,58 @@ void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) {
}
}
int
sqSyncStopQuery
(
bool
fetch
)
{
void
sqKillFetchCb
(
void
*
param
,
TAOS_RES
*
pRes
,
int
numOfRows
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
param
;
taos_kill_query
(
qParam
->
taos
);
*
qParam
->
end
=
1
;
}
void
sqKillQueryCb
(
void
*
param
,
TAOS_RES
*
pRes
,
int
code
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
param
;
if
(
code
==
0
&&
pRes
)
{
if
(
qParam
->
fetch
)
{
taos_fetch_rows_a
(
pRes
,
sqKillFetchCb
,
param
);
}
else
{
taos_kill_query
(
qParam
->
taos
);
*
qParam
->
end
=
1
;
}
}
else
{
sqExit
(
"select"
,
taos_errstr
(
pRes
));
}
}
void
sqAsyncFetchCb
(
void
*
param
,
TAOS_RES
*
pRes
,
int
numOfRows
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
param
;
if
(
numOfRows
>
0
)
{
taos_fetch_rows_a
(
pRes
,
sqAsyncFetchCb
,
param
);
}
else
{
*
qParam
->
end
=
1
;
if
(
qParam
->
free
)
{
taos_free_result
(
pRes
);
}
}
}
void
sqAsyncQueryCb
(
void
*
param
,
TAOS_RES
*
pRes
,
int
code
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
param
;
if
(
code
==
0
&&
pRes
)
{
if
(
qParam
->
fetch
)
{
taos_fetch_rows_a
(
pRes
,
sqAsyncFetchCb
,
param
);
}
else
{
if
(
qParam
->
free
)
{
taos_free_result
(
pRes
);
}
*
qParam
->
end
=
1
;
}
}
else
{
sqExit
(
"select"
,
taos_errstr
(
pRes
));
}
}
int
sqStopSyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -189,7 +241,7 @@ int sqSyncStopQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
AsyncStop
Query
(
bool
fetch
)
{
int
sq
StopAsync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -219,7 +271,7 @@ int sqAsyncStopQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
SyncFree
Query
(
bool
fetch
)
{
int
sq
FreeSync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -250,7 +302,7 @@ int sqSyncFreeQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
AsyncFree
Query
(
bool
fetch
)
{
int
sq
FreeAsync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -280,7 +332,7 @@ int sqAsyncFreeQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
SyncClose
Query
(
bool
fetch
)
{
int
sq
CloseSync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -310,7 +362,7 @@ int sqSyncCloseQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
AsyncClose
Query
(
bool
fetch
)
{
int
sq
CloseAsync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -360,9 +412,39 @@ void *syncQueryThreadFp(void *arg) {
taos_fetch_row
(
pRes
);
}
taos_free_result
(
pRes
);
if
(
qParam
->
free
)
{
taos_free_result
(
pRes
);
}
}
void
*
asyncQueryThreadFp
(
void
*
arg
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
arg
;
char
sql
[
1024
]
=
{
0
};
int32_t
code
=
0
;
TAOS
*
taos
=
taos_connect
(
hostName
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
taos
==
NULL
)
sqExit
(
"taos_connect"
,
taos_errstr
(
NULL
));
qParam
->
taos
=
taos
;
sprintf
(
sql
,
"reset query cache"
);
sqExecSQLE
(
taos
,
sql
);
sprintf
(
sql
,
"use %s"
,
dbName
);
sqExecSQLE
(
taos
,
sql
);
sprintf
(
sql
,
"select * from %s"
,
tbName
);
int32_t
qEnd
=
0
;
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
qParam
->
fetch
;
param
.
end
=
&
qEnd
;
taos_query_a
(
taos
,
sql
,
sqAsyncQueryCb
,
&
param
);
while
(
0
==
qEnd
)
{
usleep
(
5000
);
}
}
void
*
closeThreadFp
(
void
*
arg
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
arg
;
while
(
true
)
{
...
...
@@ -376,7 +458,22 @@ void *closeThreadFp(void *arg) {
}
int
sqConSyncCloseQuery
(
bool
fetch
)
{
void
*
killThreadFp
(
void
*
arg
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
arg
;
while
(
true
)
{
if
(
qParam
->
taos
)
{
usleep
(
rand
()
%
10000
);
taos_kill_query
(
qParam
->
taos
);
break
;
}
usleep
(
1
);
}
}
int
sqConCloseSyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
pthread_t
qid
,
cid
;
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
...
...
@@ -391,26 +488,160 @@ int sqConSyncCloseQuery(bool fetch) {
CASE_LEAVE
();
}
int
sqConCloseAsyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
pthread_t
qid
,
cid
;
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
fetch
;
pthread_create
(
&
qid
,
NULL
,
asyncQueryThreadFp
,
(
void
*
)
&
param
);
pthread_create
(
&
cid
,
NULL
,
closeThreadFp
,
(
void
*
)
&
param
);
pthread_join
(
qid
,
NULL
);
pthread_join
(
cid
,
NULL
);
}
CASE_LEAVE
();
}
int
sqKillSyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
int32_t
code
=
0
;
TAOS
*
taos
=
taos_connect
(
hostName
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
taos
==
NULL
)
sqExit
(
"taos_connect"
,
taos_errstr
(
NULL
));
sprintf
(
sql
,
"reset query cache"
);
sqExecSQL
(
taos
,
sql
);
sprintf
(
sql
,
"use %s"
,
dbName
);
sqExecSQL
(
taos
,
sql
);
sprintf
(
sql
,
"select * from %s"
,
tbName
);
TAOS_RES
*
pRes
=
taos_query
(
taos
,
sql
);
code
=
taos_errno
(
pRes
);
if
(
code
)
{
sqExit
(
"taos_query"
,
taos_errstr
(
pRes
));
}
if
(
fetch
)
{
taos_fetch_row
(
pRes
);
}
taos_kill_query
(
taos
);
taos_close
(
taos
);
}
CASE_LEAVE
();
}
int
sqKillAsyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
int32_t
code
=
0
;
TAOS
*
taos
=
taos_connect
(
hostName
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
taos
==
NULL
)
sqExit
(
"taos_connect"
,
taos_errstr
(
NULL
));
sprintf
(
sql
,
"reset query cache"
);
sqExecSQL
(
taos
,
sql
);
sprintf
(
sql
,
"use %s"
,
dbName
);
sqExecSQL
(
taos
,
sql
);
sprintf
(
sql
,
"select * from %s"
,
tbName
);
int32_t
qEnd
=
0
;
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
fetch
;
param
.
end
=
&
qEnd
;
param
.
taos
=
taos
;
taos_query_a
(
taos
,
sql
,
sqKillQueryCb
,
&
param
);
while
(
0
==
qEnd
)
{
usleep
(
5000
);
}
taos_close
(
taos
);
}
CASE_LEAVE
();
}
int
sqConKillSyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
pthread_t
qid
,
cid
;
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
fetch
;
pthread_create
(
&
qid
,
NULL
,
syncQueryThreadFp
,
(
void
*
)
&
param
);
pthread_create
(
&
cid
,
NULL
,
killThreadFp
,
(
void
*
)
&
param
);
pthread_join
(
qid
,
NULL
);
pthread_join
(
cid
,
NULL
);
}
CASE_LEAVE
();
}
int
sqConKillAsyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
pthread_t
qid
,
cid
;
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
fetch
;
pthread_create
(
&
qid
,
NULL
,
asyncQueryThreadFp
,
(
void
*
)
&
param
);
pthread_create
(
&
cid
,
NULL
,
killThreadFp
,
(
void
*
)
&
param
);
pthread_join
(
qid
,
NULL
);
pthread_join
(
cid
,
NULL
);
}
CASE_LEAVE
();
}
void
sqRunAllCase
(
void
)
{
/*
sqSyncStopQuery(false);
sqSyncStopQuery(true);
sqAsyncStopQuery(false);
sqAsyncStopQuery(true);
sqSyncFreeQuery(false);
sqSyncFreeQuery(true);
sqAsyncFreeQuery(false);
sqAsyncFreeQuery(true);
sqSyncCloseQuery(false);
sqSyncCloseQuery(true);
sqAsyncCloseQuery(false);
sqAsyncCloseQuery(true);
*/
sqConSyncCloseQuery
(
false
);
sqConSyncCloseQuery
(
true
);
sqStopSyncQuery(false);
sqStopSyncQuery(true);
sqStopAsyncQuery(false);
sqStopAsyncQuery(true);
sqFreeSyncQuery(false);
sqFreeSyncQuery(true);
sqFreeAsyncQuery(false);
sqFreeAsyncQuery(true);
sqCloseSyncQuery(false);
sqCloseSyncQuery(true);
sqCloseAsyncQuery(false);
sqCloseAsyncQuery(true);
sqConCloseSyncQuery(false);
sqConCloseSyncQuery(true);
sqConCloseAsyncQuery(false);
sqConCloseAsyncQuery(true);
*/
#if 0
sqKillSyncQuery(false);
sqKillSyncQuery(true);
sqKillAsyncQuery(false);
sqKillAsyncQuery(true);
#endif
//sqConKillSyncQuery(false);
sqConKillSyncQuery
(
true
);
#if 0
sqConKillAsyncQuery(false);
sqConKillAsyncQuery(true);
#endif
int32_t
l
=
5
;
while
(
l
)
{
printf
(
"%d
\n
"
,
l
--
);
sleep
(
1
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录