提交 1f24d3a3 编写于 作者: A avalon566 提交者: Zhang Yonglun

Fix Postgres execute error sql no response (#3129) (#4039)

上级 e36890c7
......@@ -60,6 +60,8 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
private volatile boolean isQuery;
private volatile boolean isErrorResponse;
private int currentSequenceId;
public MySQLComStmtExecuteExecutor(final MySQLComStmtExecutePacket comStmtExecutePacket, final BackendConnection backendConnection) {
......@@ -74,6 +76,7 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
}
BackendResponse backendResponse = databaseCommunicationEngine.execute();
if (backendResponse instanceof ErrorResponse) {
isErrorResponse = true;
return Collections.<DatabasePacket>singletonList(createErrorPacket(((ErrorResponse) backendResponse).getCause()));
}
if (backendResponse instanceof UpdateResponse) {
......@@ -108,6 +111,11 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
return isQuery;
}
@Override
public boolean isErrorResponse() {
return isErrorResponse;
}
@Override
public boolean next() throws SQLException {
return databaseCommunicationEngine.next();
......
......@@ -59,6 +59,8 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
private volatile boolean isQuery;
private volatile boolean isErrorResponse;
private int currentSequenceId;
public MySQLComQueryPacketExecutor(final MySQLComQueryPacket comQueryPacket, final BackendConnection backendConnection) {
......@@ -72,6 +74,7 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
}
BackendResponse backendResponse = textProtocolBackendHandler.execute();
if (backendResponse instanceof ErrorResponse) {
isErrorResponse = true;
return Collections.<DatabasePacket>singletonList(createErrorPacket(((ErrorResponse) backendResponse).getCause()));
}
if (backendResponse instanceof UpdateResponse) {
......@@ -123,6 +126,11 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
return isQuery;
}
@Override
public boolean isErrorResponse() {
return isErrorResponse;
}
@Override
public boolean next() throws SQLException {
return textProtocolBackendHandler.next();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.frontend.mysql.command.query.binary.execute;
import lombok.SneakyThrows;
import org.apache.shardingsphere.shardingproxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import java.sql.SQLException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MySQLComStmtExecuteExecutorTest {
@Mock
private SQLException sqlException;
@Mock
private DatabaseCommunicationEngine databaseCommunicationEngine;
@Test
@SneakyThrows
public void assertIsErrorResponse() {
MySQLComStmtExecuteExecutor mySQLComStmtExecuteExecutor = new MySQLComStmtExecuteExecutor(mock(MySQLComStmtExecutePacket.class), mock(BackendConnection.class));
FieldSetter.setField(mySQLComStmtExecuteExecutor, MySQLComStmtExecuteExecutor.class.getDeclaredField("databaseCommunicationEngine"), databaseCommunicationEngine);
when(sqlException.getCause()).thenReturn(new Exception());
when(databaseCommunicationEngine.execute()).thenReturn(new ErrorResponse(sqlException));
mySQLComStmtExecuteExecutor.execute();
Assert.assertThat(mySQLComStmtExecuteExecutor.isErrorResponse(), Matchers.is(true));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.frontend.mysql.command.query.text.query;
import lombok.SneakyThrows;
import org.apache.shardingsphere.shardingproxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.shardingproxy.transport.mysql.packet.command.query.text.query.MySQLComQueryPacket;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import java.sql.SQLException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MySQLComQueryPacketExecutorTest {
@Mock
private SQLException sqlException;
@Mock
private TextProtocolBackendHandler textProtocolBackendHandler;
@Test
@SneakyThrows
public void assertIsErrorResponse() {
MySQLComQueryPacketExecutor mySQLComQueryPacketExecutor = new MySQLComQueryPacketExecutor(mock(MySQLComQueryPacket.class), null);
FieldSetter.setField(mySQLComQueryPacketExecutor, MySQLComQueryPacketExecutor.class.getDeclaredField("textProtocolBackendHandler"), textProtocolBackendHandler);
when(sqlException.getCause()).thenReturn(new Exception());
when(textProtocolBackendHandler.execute()).thenReturn(new ErrorResponse(sqlException));
mySQLComQueryPacketExecutor.execute();
Assert.assertThat(mySQLComQueryPacketExecutor.isErrorResponse(), Matchers.is(true));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.frontend.postgresql;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import org.postgresql.util.PSQLException;
import org.postgresql.util.ServerErrorMessage;
/**
* ERR packet factory for PostgreSQL.
*
* @author avalon566
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PostgreSQLErrPacketFactory {
/**
* New instance of PostgreSQL ERR packet.
*
* @param cause cause
* @return instance of PostgreSQL ERR packet
*/
public static PostgreSQLErrorResponsePacket newInstance(final Exception cause) {
if (cause instanceof PSQLException) {
ServerErrorMessage serverErrorMessage = ((PSQLException) cause).getServerErrorMessage();
PostgreSQLErrorResponsePacket errorResponsePacket = new PostgreSQLErrorResponsePacket();
errorResponsePacket.addField(PostgreSQLErrorResponsePacket.FIELD_TYPE_SEVERITY, serverErrorMessage.getSeverity());
errorResponsePacket.addField(PostgreSQLErrorResponsePacket.FIELD_TYPE_CODE, serverErrorMessage.getSQLState());
errorResponsePacket.addField(PostgreSQLErrorResponsePacket.FIELD_TYPE_MESSAGE, serverErrorMessage.getMessage());
errorResponsePacket.addField(PostgreSQLErrorResponsePacket.FIELD_TYPE_POSITION, Integer.toString(serverErrorMessage.getPosition()));
return errorResponsePacket;
}
return new PostgreSQLErrorResponsePacket();
}
}
......@@ -74,6 +74,10 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
context.write(new PostgreSQLReadyForQueryPacket());
return;
}
if (queryCommandExecutor.isErrorResponse()) {
context.write(new PostgreSQLReadyForQueryPacket());
return;
}
int count = 0;
int proxyFrontendFlushThreshold = ShardingProxyContext.getInstance().getProperties().<Integer>getValue(PropertiesConstant.PROXY_FRONTEND_FLUSH_THRESHOLD);
while (queryCommandExecutor.next()) {
......
......@@ -29,6 +29,7 @@ import org.apache.shardingsphere.shardingproxy.backend.response.query.QueryRespo
import org.apache.shardingsphere.shardingproxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.shardingproxy.context.ShardingProxyContext;
import org.apache.shardingsphere.shardingproxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.shardingproxy.frontend.postgresql.PostgreSQLErrPacketFactory;
import org.apache.shardingsphere.shardingproxy.transport.packet.DatabasePacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.constant.PostgreSQLColumnType;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.PostgreSQLPacket;
......@@ -62,6 +63,8 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
private volatile boolean isQuery;
private volatile boolean isErrorResponse;
public PostgreSQLComBindExecutor(final PostgreSQLComBindPacket packet, final BackendConnection backendConnection) {
this.packet = packet;
databaseCommunicationEngine = null == packet.getSql()
......@@ -80,6 +83,7 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
}
BackendResponse backendResponse = databaseCommunicationEngine.execute();
if (backendResponse instanceof ErrorResponse) {
isErrorResponse = true;
result.add(createErrorPacket((ErrorResponse) backendResponse));
}
if (backendResponse instanceof UpdateResponse) {
......@@ -95,7 +99,7 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
}
private PostgreSQLErrorResponsePacket createErrorPacket(final ErrorResponse errorResponse) {
return new PostgreSQLErrorResponsePacket();
return PostgreSQLErrPacketFactory.newInstance(errorResponse.getCause());
}
private PostgreSQLCommandCompletePacket createUpdatePacket(final UpdateResponse updateResponse) {
......@@ -125,6 +129,11 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
return isQuery;
}
@Override
public boolean isErrorResponse() {
return isErrorResponse;
}
@Override
public boolean next() throws SQLException {
return null != databaseCommunicationEngine && databaseCommunicationEngine.next();
......
......@@ -18,6 +18,7 @@
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.command.query.text;
import com.google.common.base.Optional;
import org.apache.shardingsphere.shardingproxy.frontend.postgresql.PostgreSQLErrPacketFactory;
import org.apache.shardingsphere.underlying.common.database.type.DatabaseTypes;
import org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.shardingproxy.backend.response.BackendResponse;
......@@ -56,6 +57,8 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
private volatile boolean isQuery;
private volatile boolean isErrorResponse;
public PostgreSQLComQueryExecutor(final PostgreSQLComQueryPacket comQueryPacket, final BackendConnection backendConnection) {
textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(DatabaseTypes.getActualDatabaseType("PostgreSQL"), comQueryPacket.getSql(), backendConnection);
}
......@@ -67,6 +70,7 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
}
BackendResponse backendResponse = textProtocolBackendHandler.execute();
if (backendResponse instanceof ErrorResponse) {
isErrorResponse = true;
return Collections.<DatabasePacket>singletonList(createErrorPacket((ErrorResponse) backendResponse));
}
if (backendResponse instanceof UpdateResponse) {
......@@ -77,7 +81,7 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
}
private PostgreSQLErrorResponsePacket createErrorPacket(final ErrorResponse errorResponse) {
return new PostgreSQLErrorResponsePacket();
return PostgreSQLErrPacketFactory.newInstance(errorResponse.getCause());
}
private PostgreSQLCommandCompletePacket createUpdatePacket(final UpdateResponse updateResponse) {
......@@ -107,6 +111,11 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
return isQuery;
}
@Override
public boolean isErrorResponse() {
return isErrorResponse;
}
@Override
public boolean next() throws SQLException {
return textProtocolBackendHandler.next();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.command;
import io.netty.channel.ChannelHandlerContext;
import lombok.SneakyThrows;
import org.apache.shardingsphere.shardingproxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.generic.PostgreSQLReadyForQueryPacket;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PostgreSQLCommandExecuteEngineTest {
@Mock
private ChannelHandlerContext channelHandlerContext;
@Mock
private QueryCommandExecutor queryCommandExecutor;
@Test
@SneakyThrows
public void assertWriteQueryDataWithError() {
PostgreSQLCommandExecuteEngine postgreSQLCommandExecuteEngine = new PostgreSQLCommandExecuteEngine();
when(queryCommandExecutor.isQuery()).thenReturn(false);
when(queryCommandExecutor.isErrorResponse()).thenReturn(true);
postgreSQLCommandExecuteEngine.writeQueryData(channelHandlerContext, null, queryCommandExecutor, 0);
verify(channelHandlerContext, times(1)).write(isA(PostgreSQLReadyForQueryPacket.class));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.command.query.binary.bind;
import lombok.SneakyThrows;
import org.apache.shardingsphere.shardingproxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.shardingproxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import org.postgresql.util.PSQLException;
import org.postgresql.util.ServerErrorMessage;
import java.util.LinkedList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PostgreSQLComBindExecutorTest {
@Mock
private DatabaseCommunicationEngine databaseCommunicationEngine;
@Test
@SneakyThrows
public void assertExecuteHasError() {
PostgreSQLComBindExecutor postgreSQLComBindExecutor = new PostgreSQLComBindExecutor(mock(PostgreSQLComBindPacket.class), null);
FieldSetter.setField(postgreSQLComBindExecutor, PostgreSQLComBindExecutor.class.getDeclaredField("databaseCommunicationEngine"), databaseCommunicationEngine);
ErrorResponse errorResponse = new ErrorResponse(new PSQLException(mock(ServerErrorMessage.class)));
when(databaseCommunicationEngine.execute()).thenReturn(errorResponse);
Assert.assertThat(((LinkedList) postgreSQLComBindExecutor.execute()).get(1), Matchers.instanceOf(PostgreSQLErrorResponsePacket.class));
Assert.assertThat(postgreSQLComBindExecutor.isErrorResponse(), Matchers.is(true));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingproxy.frontend.postgresql.command.query.text;
import lombok.SneakyThrows;
import org.apache.shardingsphere.shardingproxy.backend.response.error.ErrorResponse;
import org.apache.shardingsphere.shardingproxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.command.query.text.PostgreSQLComQueryPacket;
import org.apache.shardingsphere.shardingproxy.transport.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import org.postgresql.util.PSQLException;
import org.postgresql.util.ServerErrorMessage;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PostgreSQLComQueryExecutorTest {
@Mock
private TextProtocolBackendHandler textProtocolBackendHandler;
@Test
@SneakyThrows
public void assertExecuteReturnErrorResponsePacket() {
PostgreSQLComQueryExecutor postgreSQLComQueryExecutor = new PostgreSQLComQueryExecutor(mock(PostgreSQLComQueryPacket.class), null);
FieldSetter.setField(postgreSQLComQueryExecutor, PostgreSQLComQueryExecutor.class.getDeclaredField("textProtocolBackendHandler"), textProtocolBackendHandler);
ErrorResponse errorResponse = new ErrorResponse(new PSQLException(mock(ServerErrorMessage.class)));
when(textProtocolBackendHandler.execute()).thenReturn(errorResponse);
Assert.assertThat(postgreSQLComQueryExecutor.execute().iterator().next(), Matchers.instanceOf(PostgreSQLErrorResponsePacket.class));
Assert.assertThat(postgreSQLComQueryExecutor.isErrorResponse(), Matchers.is(true));
}
}
......@@ -28,6 +28,13 @@ import java.sql.SQLException;
*/
public interface QueryCommandExecutor extends CommandExecutor {
/**
* Judge is error response.
*
* @return is error response or not
*/
boolean isErrorResponse();
/**
* Judge is query SQL or not.
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册