Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
817a319a
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
817a319a
编写于
7月 01, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: kill query
上级
b2be5169
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
240 addition
and
92 deletion
+240
-92
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+1
-1
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+2
-1
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+32
-12
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+3
-10
source/client/src/clientMain.c
source/client/src/clientMain.c
+3
-5
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+1
-16
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+5
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+15
-8
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+5
-5
tests/script/api/stopquery.c
tests/script/api/stopquery.c
+173
-33
未找到文件。
include/libs/scheduler/scheduler.h
浏览文件 @
817a319a
...
...
@@ -130,7 +130,7 @@ void schedulerStopQueryHb(void *pTrans);
* Free the query job
* @param pJob
*/
void
schedulerFreeJob
(
int64_t
job
,
int32_t
errCode
);
void
schedulerFreeJob
(
int64_t
*
job
,
int32_t
errCode
);
void
schedulerDestroy
(
void
);
...
...
source/client/inc/clientInt.h
浏览文件 @
817a319a
...
...
@@ -337,7 +337,8 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
SAppHbMgr
*
appHbMgrInit
(
SAppInstInfo
*
pAppInstInfo
,
char
*
key
);
void
appHbMgrCleanup
(
void
);
void
hbRemoveAppHbMrg
(
SAppHbMgr
**
pAppHbMgr
);
void
closeAllRequests
(
SHashObj
*
pRequests
);
void
destroyAllRequests
(
SHashObj
*
pRequests
);
void
stopAllRequests
(
SHashObj
*
pRequests
);
// conn level
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int64_t
tscRefId
,
int64_t
clusterId
,
int8_t
connType
);
...
...
source/client/src/clientEnv.c
浏览文件 @
817a319a
...
...
@@ -121,17 +121,37 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
return
pDnodeConn
;
}
void
close
AllRequests
(
SHashObj
*
pRequests
)
{
void
destroy
AllRequests
(
SHashObj
*
pRequests
)
{
void
*
pIter
=
taosHashIterate
(
pRequests
,
NULL
);
while
(
pIter
!=
NULL
)
{
int64_t
*
rid
=
pIter
;
removeRequest
(
*
rid
);
SRequestObj
*
pRequest
=
acquireRequest
(
*
rid
);
if
(
pRequest
)
{
destroyRequest
(
pRequest
);
releaseRequest
(
*
rid
);
}
pIter
=
taosHashIterate
(
pRequests
,
pIter
);
}
}
void
stopAllRequests
(
SHashObj
*
pRequests
)
{
void
*
pIter
=
taosHashIterate
(
pRequests
,
NULL
);
while
(
pIter
!=
NULL
)
{
int64_t
*
rid
=
pIter
;
SRequestObj
*
pRequest
=
acquireRequest
(
*
rid
);
if
(
pRequest
)
{
taos_stop_query
(
pRequest
);
releaseRequest
(
*
rid
);
}
pIter
=
taosHashIterate
(
pRequests
,
pIter
);
}
}
void
destroyAppInst
(
SAppInstInfo
*
pAppInfo
)
{
tscDebug
(
"destroy app inst mgr %p"
,
pAppInfo
);
...
...
@@ -159,12 +179,12 @@ void destroyTscObj(void *pObj) {
STscObj
*
pTscObj
=
pObj
;
int64_t
tscId
=
pTscObj
->
id
;
tsc
Debug
(
"begin to destroy tscObj %"
PRIx64
" p:%p"
,
tscId
,
pTscObj
);
tsc
Trace
(
"begin to destroy tscObj %"
PRIx64
" p:%p"
,
tscId
,
pTscObj
);
SClientHbKey
connKey
=
{.
tscRid
=
pTscObj
->
id
,
.
connType
=
pTscObj
->
connType
};
hbDeregisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connKey
);
int64_t
connNum
=
atomic_sub_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
close
AllRequests
(
pTscObj
->
pRequests
);
destroy
AllRequests
(
pTscObj
->
pRequests
);
schedulerStopQueryHb
(
pTscObj
->
pAppInfo
->
pTransporter
);
tscDebug
(
"connObj 0x%"
PRIx64
" p:%p destroyed, remain inst totalConn:%"
PRId64
,
pTscObj
->
id
,
pTscObj
,
pTscObj
->
pAppInfo
->
numOfConns
);
...
...
@@ -173,9 +193,9 @@ void destroyTscObj(void *pObj) {
destroyAppInst
(
pTscObj
->
pAppInfo
);
}
taosThreadMutexDestroy
(
&
pTscObj
->
mutex
);
taosMemoryFree
Clear
(
pTscObj
);
taosMemoryFree
(
pTscObj
);
tsc
Debug
(
"end to destroy tscObj %"
PRIx64
" p:%p"
,
tscId
,
pTscObj
);
tsc
Trace
(
"end to destroy tscObj %"
PRIx64
" p:%p"
,
tscId
,
pTscObj
);
}
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
int32_t
connType
,
SAppInstInfo
*
pAppInfo
)
{
...
...
@@ -275,13 +295,11 @@ void doDestroyRequest(void *p) {
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
p
;
int64_t
reqId
=
pRequest
->
self
;
tsc
Debug
(
"begin to destroy request %"
PRIx64
" p:%p"
,
reqId
,
pRequest
);
tsc
Trace
(
"begin to destroy request %"
PRIx64
" p:%p"
,
reqId
,
pRequest
);
taosHashRemove
(
pRequest
->
pTscObj
->
pRequests
,
&
pRequest
->
self
,
sizeof
(
pRequest
->
self
));
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
taosMemoryFreeClear
(
pRequest
->
msgBuf
);
taosMemoryFreeClear
(
pRequest
->
sqlstr
);
...
...
@@ -297,9 +315,9 @@ void doDestroyRequest(void *p) {
if
(
pRequest
->
self
)
{
deregisterRequest
(
pRequest
);
}
taosMemoryFree
Clear
(
pRequest
);
taosMemoryFree
(
pRequest
);
tsc
Debug
(
"end to destroy request %"
PRIx64
" p:%p"
,
reqId
,
pRequest
);
tsc
Trace
(
"end to destroy request %"
PRIx64
" p:%p"
,
reqId
,
pRequest
);
}
void
destroyRequest
(
SRequestObj
*
pRequest
)
{
...
...
@@ -307,6 +325,8 @@ void destroyRequest(SRequestObj *pRequest) {
return
;
}
taos_stop_query
(
pRequest
);
removeRequest
(
pRequest
->
self
);
}
...
...
source/client/src/clientImpl.c
浏览文件 @
817a319a
...
...
@@ -645,9 +645,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
pRequest
->
body
.
resInfo
.
execRes
=
res
.
res
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
pRequest
->
code
=
code
;
terrno
=
code
;
...
...
@@ -658,9 +656,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)
{
pRequest
->
body
.
resInfo
.
numOfRows
=
res
.
numOfRows
;
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
}
pRequest
->
code
=
res
.
code
;
...
...
@@ -791,10 +787,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)
{
pRequest
->
body
.
resInfo
.
numOfRows
=
pResult
->
numOfRows
;
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
pRequest
->
body
.
queryJob
=
0
;
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
}
tscDebug
(
"0x%"
PRIx64
" enter scheduler exec cb, code:%d - %s, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
...
...
source/client/src/clientMain.c
浏览文件 @
817a319a
...
...
@@ -196,10 +196,10 @@ void taos_kill_query(TAOS *taos) {
if
(
NULL
==
taos
)
{
return
;
}
int64_t
rid
=
*
(
int64_t
*
)
taos
;
int64_t
rid
=
*
(
int64_t
*
)
taos
;
STscObj
*
pTscObj
=
acquireTscObj
(
rid
);
close
AllRequests
(
pTscObj
->
pRequests
);
stop
AllRequests
(
pTscObj
->
pRequests
);
releaseTscObj
(
rid
);
}
...
...
@@ -480,9 +480,7 @@ void taos_stop_query(TAOS_RES *res) {
return
;
}
if
(
pRequest
->
body
.
queryJob
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
tscDebug
(
"request %"
PRIx64
" killed"
,
pRequest
->
requestId
);
}
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
817a319a
...
...
@@ -184,9 +184,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
}
if
((
*
pJob
->
chkKillFp
)(
pJob
->
chkKillParam
))
{
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_DROPPING
);
schUpdateJobErrCode
(
pJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
return
true
;
}
...
...
@@ -811,14 +809,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*/
int32_t
schTaskCheckSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
bool
*
needRetry
)
{
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry cause of job status, job status:%s"
,
jobTaskStatusStr
(
status
));
return
TSDB_CODE_SUCCESS
;
}
if
(
TSDB_CODE_SCH_TIMEOUT_ERROR
==
errCode
)
{
pTask
->
maxExecTimes
++
;
if
(
pTask
->
timeoutUsec
<
SCH_MAX_TASK_TIMEOUT_USEC
)
{
...
...
@@ -1277,7 +1267,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskStatus
*
taskStatus
=
taosArrayGet
(
pStatusList
,
i
);
qDebug
(
"QID:%"
PRIx64
",TID:0x%"
PRIx64
",EID:%d task status in server: %s"
,
qDebug
(
"QID:
0x
%"
PRIx64
",TID:0x%"
PRIx64
",EID:%d task status in server: %s"
,
taskStatus
->
queryId
,
taskStatus
->
taskId
,
taskStatus
->
execId
,
jobTaskStatusStr
(
taskStatus
->
status
));
SSchJob
*
pJob
=
schAcquireJob
(
taskStatus
->
refId
);
...
...
@@ -1689,11 +1679,6 @@ _return:
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"redirect will no continue cause of job status %s"
,
jobTaskStatusStr
(
status
));
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
if
((
pTask
->
execId
+
1
)
>=
pTask
->
maxExecTimes
)
{
SCH_TASK_DLOG
(
"task no more retry since reach max try times, execId:%d"
,
pTask
->
execId
);
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
817a319a
...
...
@@ -401,12 +401,16 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
goto
_return
;
}
code
=
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
pMsg
->
pData
=
NULL
;
_return:
if
(
pTask
)
{
if
(
code
)
{
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
);
}
SCH_UNLOCK_TASK
(
pTask
);
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
817a319a
...
...
@@ -225,26 +225,33 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb
(
pTrans
);
}
void
schedulerFreeJob
(
int64_t
job
,
int32_t
errCode
)
{
SSchJob
*
pJob
=
schAcquireJob
(
job
);
void
schedulerFreeJob
(
int64_t
*
job
,
int32_t
errCode
)
{
if
(
0
==
*
job
)
{
return
;
}
SSchJob
*
pJob
=
schAcquireJob
(
*
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
qError
(
"acquire sch job failed, may be dropped, jobId:0x%"
PRIx64
,
*
job
);
*
job
=
0
;
return
;
}
int32_t
code
=
schProcessOnJobDropped
(
pJob
,
errCode
);
if
(
TSDB_CODE_SCH_JOB_IS_DROPPING
==
code
)
{
SCH_JOB_DLOG
(
"sch job is already dropping, refId:0x%"
PRIx64
,
job
);
SCH_JOB_DLOG
(
"sch job is already dropping, refId:0x%"
PRIx64
,
*
job
);
*
job
=
0
;
return
;
}
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:0x%"
PRIx64
,
job
);
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:0x%"
PRIx64
,
*
job
);
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
job
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:0x%"
PRIx64
,
job
);
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
*
job
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:0x%"
PRIx64
,
*
job
);
}
schReleaseJob
(
job
);
schReleaseJob
(
*
job
);
*
job
=
0
;
}
void
schedulerDestroy
(
void
)
{
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
817a319a
...
...
@@ -457,7 +457,7 @@ void schtFreeQueryJob(int32_t freeThread) {
int64_t
job
=
queryJobRefId
;
if
(
job
&&
atomic_val_compare_exchange_64
(
&
queryJobRefId
,
job
,
0
))
{
schedulerFreeJob
(
job
,
0
);
schedulerFreeJob
(
&
job
,
0
);
if
(
freeThread
)
{
if
(
++
freeNum
%
schtTestPrintNum
==
0
)
{
printf
(
"FreeNum:%d
\n
"
,
freeNum
);
...
...
@@ -724,7 +724,7 @@ TEST(queryTest, normalCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schedulerFreeJob
(
&
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -828,7 +828,7 @@ TEST(queryTest, readyFirstCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schedulerFreeJob
(
&
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -940,7 +940,7 @@ TEST(queryTest, flowCtrlCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schedulerFreeJob
(
&
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -994,7 +994,7 @@ TEST(insertTest, normalCase) {
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
res
.
numOfRows
,
20
);
schedulerFreeJob
(
insertJobRefId
,
0
);
schedulerFreeJob
(
&
insertJobRefId
,
0
);
schedulerDestroy
();
}
...
...
tests/script/api/stopquery.c
浏览文件 @
817a319a
...
...
@@ -52,6 +52,7 @@ typedef struct {
typedef
struct
SSP_CB_PARAM
{
TAOS
*
taos
;
bool
fetch
;
bool
free
;
int32_t
*
end
;
}
SSP_CB_PARAM
;
...
...
@@ -177,8 +178,37 @@ void sqKillQueryCb(void *param, TAOS_RES *pRes, int code) {
}
}
void
sqAsyncFetchCb
(
void
*
param
,
TAOS_RES
*
pRes
,
int
numOfRows
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
param
;
if
(
numOfRows
>
0
)
{
taos_fetch_rows_a
(
pRes
,
sqAsyncFetchCb
,
param
);
}
else
{
*
qParam
->
end
=
1
;
if
(
qParam
->
free
)
{
taos_free_result
(
pRes
);
}
}
}
int
sqSyncStopQuery
(
bool
fetch
)
{
void
sqAsyncQueryCb
(
void
*
param
,
TAOS_RES
*
pRes
,
int
code
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
param
;
if
(
code
==
0
&&
pRes
)
{
if
(
qParam
->
fetch
)
{
taos_fetch_rows_a
(
pRes
,
sqAsyncFetchCb
,
param
);
}
else
{
if
(
qParam
->
free
)
{
taos_free_result
(
pRes
);
}
*
qParam
->
end
=
1
;
}
}
else
{
sqExit
(
"select"
,
taos_errstr
(
pRes
));
}
}
int
sqStopSyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -211,7 +241,7 @@ int sqSyncStopQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
AsyncStop
Query
(
bool
fetch
)
{
int
sq
StopAsync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -241,7 +271,7 @@ int sqAsyncStopQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
SyncFree
Query
(
bool
fetch
)
{
int
sq
FreeSync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -272,7 +302,7 @@ int sqSyncFreeQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
AsyncFree
Query
(
bool
fetch
)
{
int
sq
FreeAsync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -302,7 +332,7 @@ int sqAsyncFreeQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
SyncClose
Query
(
bool
fetch
)
{
int
sq
CloseSync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -332,7 +362,7 @@ int sqSyncCloseQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
AsyncClose
Query
(
bool
fetch
)
{
int
sq
CloseAsync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -382,9 +412,39 @@ void *syncQueryThreadFp(void *arg) {
taos_fetch_row
(
pRes
);
}
taos_free_result
(
pRes
);
if
(
qParam
->
free
)
{
taos_free_result
(
pRes
);
}
}
void
*
asyncQueryThreadFp
(
void
*
arg
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
arg
;
char
sql
[
1024
]
=
{
0
};
int32_t
code
=
0
;
TAOS
*
taos
=
taos_connect
(
hostName
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
taos
==
NULL
)
sqExit
(
"taos_connect"
,
taos_errstr
(
NULL
));
qParam
->
taos
=
taos
;
sprintf
(
sql
,
"reset query cache"
);
sqExecSQLE
(
taos
,
sql
);
sprintf
(
sql
,
"use %s"
,
dbName
);
sqExecSQLE
(
taos
,
sql
);
sprintf
(
sql
,
"select * from %s"
,
tbName
);
int32_t
qEnd
=
0
;
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
qParam
->
fetch
;
param
.
end
=
&
qEnd
;
taos_query_a
(
taos
,
sql
,
sqAsyncQueryCb
,
&
param
);
while
(
0
==
qEnd
)
{
usleep
(
5000
);
}
}
void
*
closeThreadFp
(
void
*
arg
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
arg
;
while
(
true
)
{
...
...
@@ -398,7 +458,22 @@ void *closeThreadFp(void *arg) {
}
int
sqConSyncCloseQuery
(
bool
fetch
)
{
void
*
killThreadFp
(
void
*
arg
)
{
SSP_CB_PARAM
*
qParam
=
(
SSP_CB_PARAM
*
)
arg
;
while
(
true
)
{
if
(
qParam
->
taos
)
{
usleep
(
rand
()
%
10000
);
taos_kill_query
(
qParam
->
taos
);
break
;
}
usleep
(
1
);
}
}
int
sqConCloseSyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
pthread_t
qid
,
cid
;
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
...
...
@@ -413,7 +488,23 @@ int sqConSyncCloseQuery(bool fetch) {
CASE_LEAVE
();
}
int
sqSyncKillQuery
(
bool
fetch
)
{
int
sqConCloseAsyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
pthread_t
qid
,
cid
;
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
fetch
;
pthread_create
(
&
qid
,
NULL
,
asyncQueryThreadFp
,
(
void
*
)
&
param
);
pthread_create
(
&
cid
,
NULL
,
closeThreadFp
,
(
void
*
)
&
param
);
pthread_join
(
qid
,
NULL
);
pthread_join
(
cid
,
NULL
);
}
CASE_LEAVE
();
}
int
sqKillSyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -445,7 +536,7 @@ int sqSyncKillQuery(bool fetch) {
CASE_LEAVE
();
}
int
sq
AsyncKill
Query
(
bool
fetch
)
{
int
sq
KillAsync
Query
(
bool
fetch
)
{
CASE_ENTER
();
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
char
sql
[
1024
]
=
{
0
};
...
...
@@ -465,6 +556,7 @@ int sqAsyncKillQuery(bool fetch) {
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
fetch
;
param
.
end
=
&
qEnd
;
param
.
taos
=
taos
;
taos_query_a
(
taos
,
sql
,
sqKillQueryCb
,
&
param
);
while
(
0
==
qEnd
)
{
usleep
(
5000
);
...
...
@@ -475,33 +567,81 @@ int sqAsyncKillQuery(bool fetch) {
CASE_LEAVE
();
}
int
sqConKillSyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
pthread_t
qid
,
cid
;
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
fetch
;
pthread_create
(
&
qid
,
NULL
,
syncQueryThreadFp
,
(
void
*
)
&
param
);
pthread_create
(
&
cid
,
NULL
,
killThreadFp
,
(
void
*
)
&
param
);
pthread_join
(
qid
,
NULL
);
pthread_join
(
cid
,
NULL
);
}
CASE_LEAVE
();
}
int
sqConKillAsyncQuery
(
bool
fetch
)
{
CASE_ENTER
();
pthread_t
qid
,
cid
;
for
(
int32_t
i
=
0
;
i
<
runTimes
;
++
i
)
{
SSP_CB_PARAM
param
=
{
0
};
param
.
fetch
=
fetch
;
pthread_create
(
&
qid
,
NULL
,
asyncQueryThreadFp
,
(
void
*
)
&
param
);
pthread_create
(
&
cid
,
NULL
,
killThreadFp
,
(
void
*
)
&
param
);
pthread_join
(
qid
,
NULL
);
pthread_join
(
cid
,
NULL
);
}
CASE_LEAVE
();
}
void
sqRunAllCase
(
void
)
{
/*
sqSyncStopQuery(false);
sqSyncStopQuery(true);
sqAsyncStopQuery(false);
sqAsyncStopQuery(true);
sqSyncFreeQuery(false);
sqSyncFreeQuery(true);
sqAsyncFreeQuery(false);
sqAsyncFreeQuery(true);
sqSyncCloseQuery(false);
sqSyncCloseQuery(true);
sqAsyncCloseQuery(false);
sqAsyncCloseQuery(true);
*/
sqConSyncCloseQuery
(
false
);
/*
sqConSyncCloseQuery(true);
sqSyncKillQuery(false);
sqSyncKillQuery(true);
sqAsyncKillQuery(false);
sqAsyncKillQuery(true);
sqStopSyncQuery(false);
sqStopSyncQuery(true);
sqStopAsyncQuery(false);
sqStopAsyncQuery(true);
sqFreeSyncQuery(false);
sqFreeSyncQuery(true);
sqFreeAsyncQuery(false);
sqFreeAsyncQuery(true);
sqCloseSyncQuery(false);
sqCloseSyncQuery(true);
sqCloseAsyncQuery(false);
sqCloseAsyncQuery(true);
sqConCloseSyncQuery(false);
sqConCloseSyncQuery(true);
sqConCloseAsyncQuery(false);
sqConCloseAsyncQuery(true);
*/
#if 0
sqKillSyncQuery(false);
sqKillSyncQuery(true);
sqKillAsyncQuery(false);
sqKillAsyncQuery(true);
#endif
//sqConKillSyncQuery(false);
sqConKillSyncQuery
(
true
);
#if 0
sqConKillAsyncQuery(false);
sqConKillAsyncQuery(true);
#endif
int32_t
l
=
5
;
while
(
l
)
{
printf
(
"%d
\n
"
,
l
--
);
sleep
(
1
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录