Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e800f2f5
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e800f2f5
编写于
9月 14, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
9月 14, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16815 from taosdata/enh/handleSchedError
enh(tsc): handle schedule error
上级
51085cec
2b8b1ebc
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
19 addition
and
16 deletion
+19
-16
include/util/tsched.h
include/util/tsched.h
+2
-4
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+6
-1
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+1
-4
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+3
-1
source/util/src/tsched.c
source/util/src/tsched.c
+7
-6
未找到文件。
include/util/tsched.h
浏览文件 @
e800f2f5
...
...
@@ -31,7 +31,6 @@ typedef struct SSchedMsg {
void
*
thandle
;
}
SSchedMsg
;
typedef
struct
{
char
label
[
TSDB_LABEL_LEN
];
tsem_t
emptySem
;
...
...
@@ -48,7 +47,6 @@ typedef struct {
void
*
pTimer
;
}
SSchedQueue
;
/**
* Create a thread-safe ring-buffer based task queue and return the instance. A thread
* pool will be created to consume the messages in the queue.
...
...
@@ -57,7 +55,7 @@ typedef struct {
* @param label the label of the queue
* @return the created queue scheduler
*/
void
*
taosInitScheduler
(
int32_t
capacity
,
int32_t
numOfThreads
,
const
char
*
label
,
SSchedQueue
*
pSched
);
void
*
taosInitScheduler
(
int32_t
capacity
,
int32_t
numOfThreads
,
const
char
*
label
,
SSchedQueue
*
pSched
);
/**
* Create a thread-safe ring-buffer based task queue and return the instance.
...
...
@@ -83,7 +81,7 @@ void taosCleanUpScheduler(void *queueScheduler);
* @param queueScheduler the queue scheduler instance
* @param pMsg the message for the task
*/
void
taosScheduleTask
(
void
*
queueScheduler
,
SSchedMsg
*
pMsg
);
int
taosScheduleTask
(
void
*
queueScheduler
,
SSchedMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/client/src/clientImpl.c
浏览文件 @
e800f2f5
...
...
@@ -1399,7 +1399,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
arg
->
msg
=
*
pMsg
;
arg
->
pEpset
=
tEpSet
;
taosAsyncExec
(
doProcessMsgFromServer
,
arg
,
NULL
);
if
(
0
!=
taosAsyncExec
(
doProcessMsgFromServer
,
arg
,
NULL
))
{
tscError
(
"failed to sched msg to tsc, tsc ready to quit"
);
rpcFreeCont
(
pMsg
->
pCont
);
taosMemoryFree
(
arg
->
pEpset
);
taosMemoryFree
(
arg
);
}
}
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
e800f2f5
...
...
@@ -134,8 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
schedMsg
.
thandle
=
execParam
;
schedMsg
.
msg
=
code
;
taosScheduleTask
(
&
pTaskQueue
,
&
schedMsg
);
return
0
;
return
taosScheduleTask
(
&
pTaskQueue
,
&
schedMsg
);
}
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
)
{
...
...
@@ -472,5 +471,3 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
return
TSDB_CODE_SUCCESS
;
}
source/libs/transport/src/transCli.c
浏览文件 @
e800f2f5
...
...
@@ -374,10 +374,12 @@ void cliHandleResp(SCliConn* conn) {
if
(
pCtx
==
NULL
&&
CONN_NO_PERSIST_BY_APP
(
conn
))
{
tDebug
(
"%s except, conn %p read while cli ignore it"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
transFreeMsg
(
transMsg
.
pCont
);
return
;
}
if
(
CONN_RELEASE_BY_SERVER
(
conn
)
&&
transMsg
.
info
.
ahandle
==
NULL
)
{
tDebug
(
"%s except, conn %p read while cli ignore it"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
transFreeMsg
(
transMsg
.
pCont
);
return
;
}
...
...
@@ -393,7 +395,7 @@ void cliHandleResp(SCliConn* conn) {
}
if
(
CONN_NO_PERSIST_BY_APP
(
conn
))
{
addConnToPool
(
pThrd
->
pool
,
conn
);
return
addConnToPool
(
pThrd
->
pool
,
conn
);
}
uv_read_start
((
uv_stream_t
*
)
conn
->
stream
,
cliAllocRecvBufferCb
,
cliRecvCb
);
...
...
source/util/src/tsched.c
浏览文件 @
e800f2f5
...
...
@@ -149,18 +149,18 @@ void *taosProcessSchedQueue(void *scheduler) {
return
NULL
;
}
void
taosScheduleTask
(
void
*
queueScheduler
,
SSchedMsg
*
pMsg
)
{
int
taosScheduleTask
(
void
*
queueScheduler
,
SSchedMsg
*
pMsg
)
{
SSchedQueue
*
pSched
=
(
SSchedQueue
*
)
queueScheduler
;
int32_t
ret
=
0
;
if
(
pSched
==
NULL
)
{
uError
(
"sched is not ready, msg:%p is dropped"
,
pMsg
);
return
;
return
-
1
;
}
if
(
atomic_load_8
(
&
pSched
->
stop
))
{
uError
(
"sched is already stopped, msg:%p is dropped"
,
pMsg
);
return
;
return
-
1
;
}
if
((
ret
=
tsem_wait
(
&
pSched
->
emptySem
))
!=
0
)
{
...
...
@@ -185,6 +185,7 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
uFatal
(
"post %s fullSem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
return
ret
;
}
void
taosCleanUpScheduler
(
void
*
param
)
{
...
...
@@ -192,11 +193,11 @@ void taosCleanUpScheduler(void *param) {
if
(
pSched
==
NULL
)
return
;
uDebug
(
"start to cleanup %s schedQsueue"
,
pSched
->
label
);
atomic_store_8
(
&
pSched
->
stop
,
1
);
taosMsleep
(
200
);
for
(
int32_t
i
=
0
;
i
<
pSched
->
numOfThreads
;
++
i
)
{
if
(
taosCheckPthreadValid
(
pSched
->
qthread
[
i
]))
{
tsem_post
(
&
pSched
->
fullSem
);
...
...
@@ -220,7 +221,7 @@ void taosCleanUpScheduler(void *param) {
if
(
pSched
->
queue
)
taosMemoryFree
(
pSched
->
queue
);
if
(
pSched
->
qthread
)
taosMemoryFree
(
pSched
->
qthread
);
//taosMemoryFree(pSched);
//
taosMemoryFree(pSched);
}
// for debug purpose, dump the scheduler status every 1min.
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录