提交 6da093a6 编写于 作者: U Ufuk Celebi

[distributed runtime] Notify about error when handing in channel

上级 256b2ee3
...@@ -52,20 +52,22 @@ class PartitionRequestClientFactory { ...@@ -52,20 +52,22 @@ class PartitionRequestClientFactory {
Object entry; Object entry;
PartitionRequestClient client = null; PartitionRequestClient client = null;
while(client == null) { while (client == null) {
entry = clients.get(remoteAddress); entry = clients.get(remoteAddress);
if (entry != null) { if (entry != null) {
// Existing channel or connecting channel // Existing channel or connecting channel
if (entry instanceof PartitionRequestClient) { if (entry instanceof PartitionRequestClient) {
client = (PartitionRequestClient) entry; client = (PartitionRequestClient) entry;
} else { }
else {
ConnectingChannel future = (ConnectingChannel) entry; ConnectingChannel future = (ConnectingChannel) entry;
client = future.waitForChannel(); client = future.waitForChannel();
clients.replace(remoteAddress, future, client); clients.replace(remoteAddress, future, client);
} }
} else { }
else {
// No channel yet. Create one, but watch out for a race. // No channel yet. Create one, but watch out for a race.
// We create a "connecting future" and atomically add it to the map. // We create a "connecting future" and atomically add it to the map.
// Only the thread that really added it establishes the channel. // Only the thread that really added it establishes the channel.
...@@ -79,18 +81,20 @@ class PartitionRequestClientFactory { ...@@ -79,18 +81,20 @@ class PartitionRequestClientFactory {
client = connectingChannel.waitForChannel(); client = connectingChannel.waitForChannel();
clients.replace(remoteAddress, connectingChannel, client); clients.replace(remoteAddress, connectingChannel, client);
} else if (old instanceof ConnectingChannel) { }
else if (old instanceof ConnectingChannel) {
client = ((ConnectingChannel) old).waitForChannel(); client = ((ConnectingChannel) old).waitForChannel();
clients.replace(remoteAddress, old, client); clients.replace(remoteAddress, old, client);
} else { }
else {
client = (PartitionRequestClient) old; client = (PartitionRequestClient) old;
} }
} }
// Make sure to increment the reference count before handing a client // Make sure to increment the reference count before handing a client
// out to ensure correct bookkeeping for channel closing. // out to ensure correct bookkeeping for channel closing.
if(!client.incrementReferenceCounter()){ if (!client.incrementReferenceCounter()) {
destroyPartitionRequestClient(remoteAddress, client); destroyPartitionRequestClient(remoteAddress, client);
client = null; client = null;
} }
...@@ -102,7 +106,7 @@ class PartitionRequestClientFactory { ...@@ -102,7 +106,7 @@ class PartitionRequestClientFactory {
public void closeOpenChannelConnections(RemoteAddress remoteAddress) { public void closeOpenChannelConnections(RemoteAddress remoteAddress) {
Object entry = clients.get(remoteAddress); Object entry = clients.get(remoteAddress);
if(entry instanceof ConnectingChannel) { if (entry instanceof ConnectingChannel) {
ConnectingChannel channel = (ConnectingChannel) entry; ConnectingChannel channel = (ConnectingChannel) entry;
if (channel.dispose()) { if (channel.dispose()) {
...@@ -141,8 +145,9 @@ class PartitionRequestClientFactory { ...@@ -141,8 +145,9 @@ class PartitionRequestClientFactory {
boolean result; boolean result;
synchronized (connectLock) { synchronized (connectLock) {
if (partitionRequestClient != null) { if (partitionRequestClient != null) {
result = partitionRequestClient.disposeIfNotUsed(); result = partitionRequestClient.disposeIfNotUsed();
} else { }
else {
disposeRequestClient = true; disposeRequestClient = true;
result = true; result = true;
} }
...@@ -155,16 +160,21 @@ class PartitionRequestClientFactory { ...@@ -155,16 +160,21 @@ class PartitionRequestClientFactory {
private void handInChannel(Channel channel) { private void handInChannel(Channel channel) {
synchronized (connectLock) { synchronized (connectLock) {
PartitionRequestClientHandler requestHandler = try {
(PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME); PartitionRequestClientHandler requestHandler =
(PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory); partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory);
if (disposeRequestClient) { if (disposeRequestClient) {
partitionRequestClient.disposeIfNotUsed(); partitionRequestClient.disposeIfNotUsed();
} }
connectLock.notifyAll(); connectLock.notifyAll();
}
catch (Throwable t) {
notifyOfError(t);
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册