提交 d6f90ddc 编写于 作者: Y Yong Zhang 提交者: Sijie Guo

Fixes #5814: add SentConnectFrame state check when running `handleError` (#5913)



Fixes #5841

*Motivation*

when enabling authentication and authorization, if a user using
the wrong key to send to the server, the server will return an
`Error` message. There is no `Connected` message return to the
client so the client is staying in `SentConnectFrame` and it can
receive the server `Error` message.
We need to check the `SentConnectFrame` state when receiving `Error` message.

The client will throw error:

```
java.lang.IllegalArgumentException: null
	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:108) ~[guava-21.0.jar:?]
	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:588) ~[pulsar-client-original-2.4.1.jar:2.4.1]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:154) ~[pulsar-common-2.4.1.jar:2.4.1]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:433) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
```

*Modifications*

- Add `SentConnectFrame` check
上级 babce5e0
......@@ -593,7 +593,7 @@ public class ClientCnx extends PulsarHandler {
@Override
protected void handleError(CommandError error) {
checkArgument(state == State.Ready);
checkArgument(state == State.SentConnectFrame || state == State.Ready);
log.warn("{} Received error from server: {}", ctx.channel(), error.getMessage());
long requestId = error.getRequestId();
......
......@@ -22,11 +22,15 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.concurrent.ThreadFactory;
import io.netty.channel.Channel;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.Test;
......@@ -60,4 +64,35 @@ public class ClientCnxTest {
}
}
@Test
public void testReceiveErrorAtSendConnectFrameState() throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
ClientCnx cnx = new ClientCnx(conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
when(ctx.channel()).thenReturn(channel);
Field ctxField = PulsarHandler.class.getDeclaredField("ctx");
ctxField.setAccessible(true);
ctxField.set(cnx, ctx);
// set connection as SentConnectFrame
Field cnxField = ClientCnx.class.getDeclaredField("state");
cnxField.setAccessible(true);
cnxField.set(cnx, ClientCnx.State.SentConnectFrame);
// receive error
PulsarApi.CommandError commandError = PulsarApi.CommandError.newBuilder()
.setRequestId(-1).setError(PulsarApi.ServerError.AuthenticationError).setMessage("authentication was failed").build();
try {
cnx.handleError(commandError);
} catch (Exception e) {
fail("should not throw any error");
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册