Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
6c5a9507
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
6c5a9507
编写于
12月 06, 2017
作者:
Y
yukon
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ROCKETMQ-323] Release semaphore after callback being finished for async invoke
上级
8c303104
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
98 addition
and
2 deletion
+98
-2
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
...apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+5
-2
remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
...he/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+93
-0
未找到文件。
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
浏览文件 @
6c5a9507
...
@@ -257,14 +257,13 @@ public abstract class NettyRemotingAbstract {
...
@@ -257,14 +257,13 @@ public abstract class NettyRemotingAbstract {
if
(
responseFuture
!=
null
)
{
if
(
responseFuture
!=
null
)
{
responseFuture
.
setResponseCommand
(
cmd
);
responseFuture
.
setResponseCommand
(
cmd
);
responseFuture
.
release
();
responseTable
.
remove
(
opaque
);
responseTable
.
remove
(
opaque
);
if
(
responseFuture
.
getInvokeCallback
()
!=
null
)
{
if
(
responseFuture
.
getInvokeCallback
()
!=
null
)
{
executeInvokeCallback
(
responseFuture
);
executeInvokeCallback
(
responseFuture
);
}
else
{
}
else
{
responseFuture
.
putResponse
(
cmd
);
responseFuture
.
putResponse
(
cmd
);
responseFuture
.
release
();
}
}
}
else
{
}
else
{
log
.
warn
(
"receive response, but not matched any request, "
+
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
()));
log
.
warn
(
"receive response, but not matched any request, "
+
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
()));
...
@@ -287,6 +286,8 @@ public abstract class NettyRemotingAbstract {
...
@@ -287,6 +286,8 @@ public abstract class NettyRemotingAbstract {
responseFuture
.
executeInvokeCallback
();
responseFuture
.
executeInvokeCallback
();
}
catch
(
Throwable
e
)
{
}
catch
(
Throwable
e
)
{
log
.
warn
(
"execute callback in executor exception, and callback throw"
,
e
);
log
.
warn
(
"execute callback in executor exception, and callback throw"
,
e
);
}
finally
{
responseFuture
.
release
();
}
}
}
}
});
});
...
@@ -303,6 +304,8 @@ public abstract class NettyRemotingAbstract {
...
@@ -303,6 +304,8 @@ public abstract class NettyRemotingAbstract {
responseFuture
.
executeInvokeCallback
();
responseFuture
.
executeInvokeCallback
();
}
catch
(
Throwable
e
)
{
}
catch
(
Throwable
e
)
{
log
.
warn
(
"executeInvokeCallback Exception"
,
e
);
log
.
warn
(
"executeInvokeCallback Exception"
,
e
);
}
finally
{
responseFuture
.
release
();
}
}
}
}
}
}
...
...
remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
0 → 100644
浏览文件 @
6c5a9507
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.remoting.netty
;
import
java.util.concurrent.Semaphore
;
import
org.apache.rocketmq.remoting.InvokeCallback
;
import
org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
NettyRemotingAbstractTest
{
@Spy
private
NettyRemotingAbstract
remotingAbstract
=
new
NettyRemotingClient
(
new
NettyClientConfig
());
@Test
public
void
testProcessResponseCommand
()
throws
InterruptedException
{
final
Semaphore
semaphore
=
new
Semaphore
(
0
);
ResponseFuture
responseFuture
=
new
ResponseFuture
(
1
,
3000
,
new
InvokeCallback
()
{
@Override
public
void
operationComplete
(
final
ResponseFuture
responseFuture
)
{
assertThat
(
semaphore
.
availablePermits
()).
isEqualTo
(
0
);
}
},
new
SemaphoreReleaseOnlyOnce
(
semaphore
));
remotingAbstract
.
responseTable
.
putIfAbsent
(
1
,
responseFuture
);
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
0
,
"Foo"
);
response
.
setOpaque
(
1
);
remotingAbstract
.
processResponseCommand
(
null
,
response
);
// Acquire the release permit after call back
semaphore
.
acquire
(
1
);
assertThat
(
semaphore
.
availablePermits
()).
isEqualTo
(
0
);
}
@Test
public
void
testProcessResponseCommand_NullCallBack
()
throws
InterruptedException
{
final
Semaphore
semaphore
=
new
Semaphore
(
0
);
ResponseFuture
responseFuture
=
new
ResponseFuture
(
1
,
3000
,
null
,
new
SemaphoreReleaseOnlyOnce
(
semaphore
));
remotingAbstract
.
responseTable
.
putIfAbsent
(
1
,
responseFuture
);
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
0
,
"Foo"
);
response
.
setOpaque
(
1
);
remotingAbstract
.
processResponseCommand
(
null
,
response
);
assertThat
(
semaphore
.
availablePermits
()).
isEqualTo
(
1
);
}
@Test
public
void
testProcessResponseCommand_RunCallBackInCurrentThread
()
throws
InterruptedException
{
final
Semaphore
semaphore
=
new
Semaphore
(
0
);
ResponseFuture
responseFuture
=
new
ResponseFuture
(
1
,
3000
,
new
InvokeCallback
()
{
@Override
public
void
operationComplete
(
final
ResponseFuture
responseFuture
)
{
assertThat
(
semaphore
.
availablePermits
()).
isEqualTo
(
0
);
}
},
new
SemaphoreReleaseOnlyOnce
(
semaphore
));
remotingAbstract
.
responseTable
.
putIfAbsent
(
1
,
responseFuture
);
when
(
remotingAbstract
.
getCallbackExecutor
()).
thenReturn
(
null
);
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
0
,
"Foo"
);
response
.
setOpaque
(
1
);
remotingAbstract
.
processResponseCommand
(
null
,
response
);
// Acquire the release permit after call back finished in current thread
semaphore
.
acquire
(
1
);
assertThat
(
semaphore
.
availablePermits
()).
isEqualTo
(
0
);
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录