提交 1f053681 编写于 作者: R Rajan 提交者: Matteo Merli

Fix: broken websocket test and proxy-producer error handling (#99)

上级 5799ffaa
......@@ -95,6 +95,7 @@ http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace}
##### Acknowledgement from server
###### Success response
```json
{
"result": "ok",
......@@ -102,6 +103,14 @@ http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace}
"context": "1"
}
```
###### Failure response
```json
{
"result": "send-error:3",
"errorMsg": "Failed to de-serialize from JSON",
"context": "1"
}
```
| Key | Type | Requirement | Explanation |
|:------------|:------:|:-----------:|:--------------------------------------------:|
......@@ -172,6 +181,8 @@ following error codes:
| 4 | Failed to serialize to JSON |
| 5 | Failed to authenticate client |
| 6 | Client is not authorized |
| 7 | Invalid payload encoding |
| 8 | Unknown error |
Application is responsible to re-establish a new WebSocket session after
a backoff period.
......
......@@ -237,7 +237,7 @@ public abstract class AdminResource extends PulsarWebResource {
}
}
public ObjectMapper jsonMapper() {
public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}
......
......@@ -20,50 +20,60 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;
public class ProxyAuthenticationTest extends MockedPulsarServiceBaseTest {
public class ProxyAuthenticationTest extends ProducerConsumerBase {
protected String methodName;
private static final int TEST_PORT = PortManager.nextFreePort();
private static final int TEST_PORT = 6080;
private static final String CONSUME_URI = "ws://localhost:" + TEST_PORT
+ "/consume/persistent/my-property/cluster1/my-ns/my-topic/my-sub";
+ "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "ws://localhost:" + TEST_PORT
+ "/produce/persistent/my-property/cluster1/my-ns/my-topic/";
+ "/ws/producer/persistent/my-property/use/my-ns/my-topic/";
private ProxyServer proxyServer;
private WebSocketService service;
@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
ServiceConfiguration config = new ServiceConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(
Sets.newHashSet("com.yahoo.pulsar.websocket.proxy.MockAuthenticationProvider"));
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
service.start();
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}
@Override
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");
}
......@@ -81,16 +91,20 @@ public class ProxyAuthenticationTest extends MockedPulsarServiceBaseTest {
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : %s%n", consumeUri);
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
produceClient.connect(produceSocket, produceUri, produceRequest);
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Thread.sleep(1000);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());
consumeSocket.awaitClose(1, TimeUnit.SECONDS);
produceSocket.awaitClose(1, TimeUnit.SECONDS);
Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} catch (Throwable t) {
log.error(t.getMessage());
......
......@@ -19,49 +19,58 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;
public class ProxyPublishConsumeTest extends MockedPulsarServiceBaseTest {
public class ProxyPublishConsumeTest extends ProducerConsumerBase {
protected String methodName;
private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/cluster1/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/cluster1/my-ns/my-topic/";
private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/use/my-ns/my-topic/";
private static final int TEST_PORT = 6080;
private ProxyServer proxyServer;
private WebSocketService service;
@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
ServiceConfiguration config = new ServiceConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
service.start();
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}
@Override
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");
}
@Test
public void socketTest() throws InterruptedException {
public void socketTest() throws Exception {
URI consumeUri = URI.create(CONSUME_URI);
URI produceUri = URI.create(PRODUCE_URI);
......@@ -73,19 +82,21 @@ public class ProxyPublishConsumeTest extends MockedPulsarServiceBaseTest {
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : %s%n", consumeUri);
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
produceClient.connect(produceSocket, produceUri, produceRequest);
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Thread.sleep(1000);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());
consumeSocket.awaitClose(1, TimeUnit.SECONDS);
produceSocket.awaitClose(1, TimeUnit.SECONDS);
Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} catch (Throwable t) {
log.error(t.getMessage());
} finally {
try {
consumeClient.stop();
......
......@@ -22,6 +22,7 @@ import java.net.URI;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
......@@ -29,34 +30,39 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
public class ProxyPublishConsumeTls extends MockedPulsarServiceBaseTest {
public class ProxyPublishConsumeTls extends ProducerConsumerBase {
protected String methodName;
private static final String CONSUME_URI = "wss://localhost:6090/ws/consumer/persistent/my-property/cluster1/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "wss://localhost:6090/ws/producer/persistent/my-property/cluster1/my-ns/my-topic/";
private static final String CONSUME_URI = "wss://localhost:6090/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "wss://localhost:6090/ws/producer/persistent/my-property/use/my-ns/my-topic/";
private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
private static final int TEST_PORT = 6080;
private static final int TLS_TEST_PORT = 6090;
private ProxyServer proxyServer;
private WebSocketService service;
@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
ServiceConfiguration config = new ServiceConfiguration();
config.setWebServicePort(TEST_PORT);
......@@ -64,18 +70,19 @@ public class ProxyPublishConsumeTls extends MockedPulsarServiceBaseTest {
config.setTlsEnabled(true);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setClusterName("use");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
service.start();
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}
@Override
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");
}
......@@ -101,16 +108,20 @@ public class ProxyPublishConsumeTls extends MockedPulsarServiceBaseTest {
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : %s%n", consumeUri);
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
produceClient.connect(produceSocket, produceUri, produceRequest);
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Thread.sleep(1000);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());
consumeSocket.awaitClose(1, TimeUnit.SECONDS);
produceSocket.awaitClose(1, TimeUnit.SECONDS);
Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} catch (Throwable t) {
log.error(t.getMessage());
......
......@@ -50,14 +50,14 @@ public class SimpleConsumerSocket {
@OnWebSocketClose
public void onClose(int statusCode, String reason) {
log.info("Connection closed: %d - %s%n", statusCode, reason);
log.info("Connection closed: {} - {}", statusCode, reason);
this.session = null;
this.closeLatch.countDown();
}
@OnWebSocketConnect
public void onConnect(Session session) throws InterruptedException {
log.info("Got connect: %s%n", session);
log.info("Got connect: {}", session);
this.session = session;
log.debug("Got connected: {}", session);
}
......
......@@ -17,11 +17,10 @@ package com.yahoo.pulsar.websocket.proxy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Base64.getEncoder;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
......@@ -33,12 +32,13 @@ import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import com.yahoo.pulsar.websocket.data.ProducerMessage;
@WebSocket(maxTextMessageSize = 64 * 1024)
public class SimpleProducerSocket {
private final static String message = getEncoder().encodeToString("test".getBytes());
private final static String testJsonString = "{\"content\": \"" + message
+ "\", \"properties\" : {\"test\" :\"test\"}}";
private final CountDownLatch closeLatch;
private Session session;
private ArrayList<String> producerBuffer;
......@@ -48,24 +48,30 @@ public class SimpleProducerSocket {
producerBuffer = new ArrayList<String>();
}
private static String getTestJsonPayload(int index) throws JsonProcessingException {
ProducerMessage msg = new ProducerMessage();
msg.payload = Base64.getEncoder().encodeToString(("test" + index).getBytes());
msg.key = Integer.toString(index);
return jsonMapper().writeValueAsString(msg);
}
public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
return this.closeLatch.await(duration, unit);
}
@OnWebSocketClose
public void onClose(int statusCode, String reason) {
log.info("Connection closed: %d - %s%n", statusCode, reason);
log.info("Connection closed: {} - {}", statusCode, reason);
this.session = null;
this.closeLatch.countDown();
}
@OnWebSocketConnect
public void onConnect(Session session) throws InterruptedException, IOException, JSONException {
log.info("Got connect: %s%n", session);
log.info("Got connect: {}", session);
this.session = session;
String sampleMsg = new JSONObject(testJsonString).toString();
for (int i = 0; i < 10; i++) {
this.session.getRemote().sendString(sampleMsg);
this.session.getRemote().sendString(getTestJsonPayload(i));
}
}
......
......@@ -25,6 +25,7 @@ import org.eclipse.jetty.websocket.api.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.yahoo.pulsar.client.api.CompressionType;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.MessageBuilder;
......@@ -35,6 +36,9 @@ import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.websocket.data.ProducerAck;
import com.yahoo.pulsar.websocket.data.ProducerMessage;
import static java.lang.String.format;
import static com.yahoo.pulsar.websocket.WebSocketError.*;
/**
* Websocket end-point url handler to handle incoming message coming from client. Websocket end-point url handler to
......@@ -68,27 +72,36 @@ public class ProducerHandler extends AbstractWebSocketHandler {
ProducerConfiguration conf = getProducerConfiguration();
producer = service.getPulsarClient().createProducer(topic, conf);
} catch (Exception e) {
close(WebSocketError.FailedToCreateProducer, e.getMessage());
close(FailedToCreateProducer, e.getMessage());
}
}
@Override
public void onWebSocketText(String message) {
ProducerMessage sendRequest;
byte[] rawPayload = null;
String requestContext = null;
try {
sendRequest = ObjectMapperFactory.getThreadLocal().readValue(message, ProducerMessage.class);
} catch (IOException e1) {
close(WebSocketError.FailedToSerializeToJSON, e1.getMessage());
requestContext = sendRequest.context;
rawPayload = Base64.getDecoder().decode(sendRequest.payload);
} catch (IOException e) {
sendAckResponse(new ProducerAck(FailedToDeserializeFromJSON, e.getMessage(), null, null));
return;
} catch (IllegalArgumentException e) {
String msg = format("Invalid Base64 message-payload error=%s", e.getMessage());
sendAckResponse(new ProducerAck(PayloadEncodingError, msg, null, requestContext));
return;
}
byte[] rawPayload = Base64.getDecoder().decode(sendRequest.payload);
MessageBuilder builder = MessageBuilder.create().setContent(rawPayload);
MessageBuilder builder = MessageBuilder.create().setContent(rawPayload).setProperties(sendRequest.properties);
if (sendRequest.properties != null) {
builder.setProperties(sendRequest.properties);
}
if (sendRequest.key != null) {
builder.setKey(sendRequest.key);
}
if (sendRequest.replicationClusters != null) {
builder.setReplicationClusters(sendRequest.replicationClusters);
}
......@@ -97,23 +110,11 @@ public class ProducerHandler extends AbstractWebSocketHandler {
producer.sendAsync(msg).thenAccept(msgId -> {
if (isConnected()) {
String messageId = Base64.getEncoder().encodeToString(msgId.toByteArray());
ProducerAck response = new ProducerAck("ok", messageId, sendRequest.context);
String jsonResponse;
try {
jsonResponse = ObjectMapperFactory.getThreadLocal().writeValueAsString(response);
getSession().getRemote().sendString(jsonResponse);
} catch (Exception e) {
log.warn("Error send ack response: ", e);
}
sendAckResponse(new ProducerAck(messageId, sendRequest.context));
}
}).exceptionally(exception -> {
ProducerAck response = new ProducerAck("send-error: " + exception.getMessage(), null, sendRequest.context);
try {
String jsonResponse = ObjectMapperFactory.getThreadLocal().writeValueAsString(response);
getSession().getRemote().sendString(jsonResponse);
} catch (Exception e) {
log.warn("Error send ack response: ", e);
}
sendAckResponse(
new ProducerAck(UnknownError, exception.getMessage(), null, sendRequest.context));
return null;
});
}
......@@ -122,6 +123,17 @@ public class ProducerHandler extends AbstractWebSocketHandler {
return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole);
}
private void sendAckResponse(ProducerAck response) {
try {
String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(response);
getSession().getRemote().sendString(msg);
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to generate ack json-response {}", producer.getTopic(), e.getMessage(), e);
} catch (Exception e) {
log.warn("[{}] Failed to send ack {}", producer.getTopic(), e.getMessage(), e);
}
}
private ProducerConfiguration getProducerConfiguration() {
ProducerConfiguration conf = new ProducerConfiguration();
......
......@@ -28,7 +28,9 @@ public enum WebSocketError {
FailedToDeserializeFromJSON(3, "Failed to de-serialize from JSON"), //
FailedToSerializeToJSON(4, "Failed to serialize to JSON"), //
AuthenticationError(5, "Failed to authenticate client"), //
NotAuthorizedError(6, "Client is not authorized"); //
NotAuthorizedError(6, "Client is not authorized"), //
PayloadEncodingError(7, "Invalid payload encoding"), //
UnknownError(8, "Unknown error"); //
private final int code;
private final String description;
......
......@@ -17,16 +17,27 @@ package com.yahoo.pulsar.websocket.data;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import static com.google.common.base.Joiner.on;
import com.yahoo.pulsar.websocket.WebSocketError;
@JsonInclude(Include.NON_NULL)
public class ProducerAck {
public String result;
public String errorMsg;
public String messageId;
public String context;
public ProducerAck(String result, String messageId, String context) {
this.result = result;
public ProducerAck(String messageId, String context) {
this.result = "ok";
this.messageId = messageId;
this.context = context;
}
public ProducerAck(WebSocketError error, String errorMsg, String messageId, String context) {
this.result = on(':').join("send-error", error.getCode());
this.errorMsg = errorMsg;
this.messageId = messageId;
this.context = context;
}
}
......@@ -27,32 +27,30 @@ import com.yahoo.pulsar.websocket.WebSocketProducerServlet;
import com.yahoo.pulsar.websocket.WebSocketService;
public class WebSocketServiceStarter {
public static void main(String args[]) throws Exception {
checkArgument(args.length == 1, "Need to specify a configuration file");
try {
// load config file and start proxy service
String configFile = args[0];
log.info("Loading configuration from {}", configFile);
ServiceConfiguration config = ServiceConfigurationLoader.create(configFile);
ProxyServer proxyServer = new ProxyServer(config);
WebSocketService service = new WebSocketService(config);
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH,
new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH,
new WebSocketConsumerServlet(service));
proxyServer.start();
service.start();
start(proxyServer, service);
} catch (Exception e) {
log.error("Failed to start WebSocket service", e);
Runtime.getRuntime().halt(1);
}
}
public static void start(ProxyServer proxyServer, WebSocketService service) throws Exception {
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service));
proxyServer.start();
service.start();
}
private static final Logger log = LoggerFactory.getLogger(WebSocketServiceStarter.class);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册