diff --git a/docs/WebSocket.md b/docs/WebSocket.md index 041800420dca884d95fc15413114a21451f6c855..c465b139bb6ce6fccd85912a0f95fee4901b5e94 100644 --- a/docs/WebSocket.md +++ b/docs/WebSocket.md @@ -95,6 +95,7 @@ http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace} ##### Acknowledgement from server +###### Success response ```json { "result": "ok", @@ -102,6 +103,14 @@ http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace} "context": "1" } ``` +###### Failure response +```json + { + "result": "send-error:3", + "errorMsg": "Failed to de-serialize from JSON", + "context": "1" + } +``` | Key | Type | Requirement | Explanation | |:------------|:------:|:-----------:|:--------------------------------------------:| @@ -172,6 +181,8 @@ following error codes: | 4 | Failed to serialize to JSON | | 5 | Failed to authenticate client | | 6 | Client is not authorized | +| 7 | Invalid payload encoding | +| 8 | Unknown error | Application is responsible to re-establish a new WebSocket session after a backoff period. diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/AdminResource.java index 3da51f107f3b39f1c91ba28df8f819f3f110e4d1..e7a982d2cf3da95c9fb81c0b6d69bb0fff331d76 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/AdminResource.java @@ -237,7 +237,7 @@ public abstract class AdminResource extends PulsarWebResource { } } - public ObjectMapper jsonMapper() { + public static ObjectMapper jsonMapper() { return ObjectMapperFactory.getThreadLocal(); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java index 62a35698cc809efcf6814aa8eb16e57ffede316c..6c970a970b304b4537edcf99065989d65c8e38a5 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -20,50 +20,60 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import java.net.URI; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.test.PortManager; +import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.collect.Sets; import com.yahoo.pulsar.broker.ServiceConfiguration; -import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import com.yahoo.pulsar.client.api.ProducerConsumerBase; import com.yahoo.pulsar.websocket.WebSocketService; +import com.yahoo.pulsar.websocket.service.ProxyServer; +import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter; -public class ProxyAuthenticationTest extends MockedPulsarServiceBaseTest { +public class ProxyAuthenticationTest extends ProducerConsumerBase { protected String methodName; - private static final int TEST_PORT = PortManager.nextFreePort(); + private static final int TEST_PORT = 6080; private static final String CONSUME_URI = "ws://localhost:" + TEST_PORT - + "/consume/persistent/my-property/cluster1/my-ns/my-topic/my-sub"; + + "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub"; private static final String PRODUCE_URI = "ws://localhost:" + TEST_PORT - + "/produce/persistent/my-property/cluster1/my-ns/my-topic/"; + + "/ws/producer/persistent/my-property/use/my-ns/my-topic/"; + private ProxyServer proxyServer; private WebSocketService service; @BeforeClass public void setup() throws Exception { super.internalSetup(); + super.producerBaseSetup(); ServiceConfiguration config = new ServiceConfiguration(); config.setWebServicePort(TEST_PORT); + config.setClusterName("use"); config.setAuthenticationEnabled(true); config.setAuthenticationProviders( Sets.newHashSet("com.yahoo.pulsar.websocket.proxy.MockAuthenticationProvider")); service = spy(new WebSocketService(config)); doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); - service.start(); + proxyServer = new ProxyServer(config); + WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); } - @Override + @AfterClass protected void cleanup() throws Exception { super.internalCleanup(); service.close(); + proxyServer.stop(); log.info("Finished Cleaning Up Test setup"); } @@ -81,16 +91,20 @@ public class ProxyAuthenticationTest extends MockedPulsarServiceBaseTest { try { consumeClient.start(); ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); - consumeClient.connect(consumeSocket, consumeUri, consumeRequest); - log.info("Connecting to : %s%n", consumeUri); + Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + log.info("Connecting to : {}", consumeUri); ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); produceClient.start(); - produceClient.connect(produceSocket, produceUri, produceRequest); + Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + // let it connect + Thread.sleep(1000); + Assert.assertTrue(consumerFuture.get().isOpen()); + Assert.assertTrue(producerFuture.get().isOpen()); consumeSocket.awaitClose(1, TimeUnit.SECONDS); produceSocket.awaitClose(1, TimeUnit.SECONDS); - + Assert.assertTrue(produceSocket.getBuffer().size() > 0); Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); } catch (Throwable t) { log.error(t.getMessage()); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 68e6906e163ae61ada567fac5eb39b819bf73674..f1dccbab58e6330e4453ec46682170d18b850cfa 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -19,49 +19,58 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import java.net.URI; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.yahoo.pulsar.broker.ServiceConfiguration; -import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import com.yahoo.pulsar.client.api.ProducerConsumerBase; import com.yahoo.pulsar.websocket.WebSocketService; +import com.yahoo.pulsar.websocket.service.ProxyServer; +import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter; -public class ProxyPublishConsumeTest extends MockedPulsarServiceBaseTest { +public class ProxyPublishConsumeTest extends ProducerConsumerBase { protected String methodName; - private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/cluster1/my-ns/my-topic/my-sub"; - private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/cluster1/my-ns/my-topic/"; + private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub"; + private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/use/my-ns/my-topic/"; private static final int TEST_PORT = 6080; + private ProxyServer proxyServer; private WebSocketService service; @BeforeClass public void setup() throws Exception { super.internalSetup(); + super.producerBaseSetup(); ServiceConfiguration config = new ServiceConfiguration(); config.setWebServicePort(TEST_PORT); + config.setClusterName("use"); service = spy(new WebSocketService(config)); doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); - service.start(); + proxyServer = new ProxyServer(config); + WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); } - @Override + @AfterClass protected void cleanup() throws Exception { super.internalCleanup(); service.close(); + proxyServer.stop(); log.info("Finished Cleaning Up Test setup"); - } @Test - public void socketTest() throws InterruptedException { + public void socketTest() throws Exception { URI consumeUri = URI.create(CONSUME_URI); URI produceUri = URI.create(PRODUCE_URI); @@ -73,19 +82,21 @@ public class ProxyPublishConsumeTest extends MockedPulsarServiceBaseTest { try { consumeClient.start(); ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); - consumeClient.connect(consumeSocket, consumeUri, consumeRequest); - log.info("Connecting to : %s%n", consumeUri); + Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + log.info("Connecting to : {}", consumeUri); ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); produceClient.start(); - produceClient.connect(produceSocket, produceUri, produceRequest); - + Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + // let it connect + Thread.sleep(1000); + Assert.assertTrue(consumerFuture.get().isOpen()); + Assert.assertTrue(producerFuture.get().isOpen()); + consumeSocket.awaitClose(1, TimeUnit.SECONDS); produceSocket.awaitClose(1, TimeUnit.SECONDS); - + Assert.assertTrue(produceSocket.getBuffer().size() > 0); Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); - } catch (Throwable t) { - log.error(t.getMessage()); } finally { try { consumeClient.stop(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java index ae39360b9ad430560aa2b116f89e2a12b94b7e0e..7c4d4c701f1fd5f4171a979bcc7554c3a6fcb4cd 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java @@ -22,6 +22,7 @@ import java.net.URI; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; @@ -29,34 +30,39 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.yahoo.pulsar.broker.ServiceConfiguration; -import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import com.yahoo.pulsar.client.api.ProducerConsumerBase; import com.yahoo.pulsar.websocket.WebSocketService; +import com.yahoo.pulsar.websocket.service.ProxyServer; +import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; -public class ProxyPublishConsumeTls extends MockedPulsarServiceBaseTest { +public class ProxyPublishConsumeTls extends ProducerConsumerBase { protected String methodName; - private static final String CONSUME_URI = "wss://localhost:6090/ws/consumer/persistent/my-property/cluster1/my-ns/my-topic/my-sub"; - private static final String PRODUCE_URI = "wss://localhost:6090/ws/producer/persistent/my-property/cluster1/my-ns/my-topic/"; + private static final String CONSUME_URI = "wss://localhost:6090/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub"; + private static final String PRODUCE_URI = "wss://localhost:6090/ws/producer/persistent/my-property/use/my-ns/my-topic/"; private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt"; private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key"; private static final int TEST_PORT = 6080; private static final int TLS_TEST_PORT = 6090; - + private ProxyServer proxyServer; private WebSocketService service; @BeforeClass public void setup() throws Exception { super.internalSetup(); + super.producerBaseSetup(); ServiceConfiguration config = new ServiceConfiguration(); config.setWebServicePort(TEST_PORT); @@ -64,18 +70,19 @@ public class ProxyPublishConsumeTls extends MockedPulsarServiceBaseTest { config.setTlsEnabled(true); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); - + config.setClusterName("use"); service = spy(new WebSocketService(config)); doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); - service.start(); - + proxyServer = new ProxyServer(config); + WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); } - @Override + @AfterClass protected void cleanup() throws Exception { super.internalCleanup(); service.close(); + proxyServer.stop(); log.info("Finished Cleaning Up Test setup"); } @@ -101,16 +108,20 @@ public class ProxyPublishConsumeTls extends MockedPulsarServiceBaseTest { try { consumeClient.start(); ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); - consumeClient.connect(consumeSocket, consumeUri, consumeRequest); - log.info("Connecting to : %s%n", consumeUri); + Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + log.info("Connecting to : {}", consumeUri); ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); produceClient.start(); - produceClient.connect(produceSocket, produceUri, produceRequest); + Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + // let it connect + Thread.sleep(1000); + Assert.assertTrue(consumerFuture.get().isOpen()); + Assert.assertTrue(producerFuture.get().isOpen()); consumeSocket.awaitClose(1, TimeUnit.SECONDS); produceSocket.awaitClose(1, TimeUnit.SECONDS); - + Assert.assertTrue(produceSocket.getBuffer().size() > 0); Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); } catch (Throwable t) { log.error(t.getMessage()); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/SimpleConsumerSocket.java index f9e6bf8b05d4ea76484a97c2d27fcbf93f65a375..0e04c7a09058b6ca87ac651d3b7869be0cffe77f 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/SimpleConsumerSocket.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/SimpleConsumerSocket.java @@ -50,14 +50,14 @@ public class SimpleConsumerSocket { @OnWebSocketClose public void onClose(int statusCode, String reason) { - log.info("Connection closed: %d - %s%n", statusCode, reason); + log.info("Connection closed: {} - {}", statusCode, reason); this.session = null; this.closeLatch.countDown(); } @OnWebSocketConnect public void onConnect(Session session) throws InterruptedException { - log.info("Got connect: %s%n", session); + log.info("Got connect: {}", session); this.session = session; log.debug("Got connected: {}", session); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/SimpleProducerSocket.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/SimpleProducerSocket.java index 5973fb33cf8d59f1a459bd8c37776b4f7daa860f..2a7a865122bd917a0191fa58225d1b9a132a72b6 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/SimpleProducerSocket.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/SimpleProducerSocket.java @@ -17,11 +17,10 @@ package com.yahoo.pulsar.websocket.proxy; import java.io.IOException; import java.util.ArrayList; +import java.util.Base64; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static java.util.Base64.getEncoder; - import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; @@ -33,12 +32,13 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper; +import com.yahoo.pulsar.websocket.data.ProducerMessage; + @WebSocket(maxTextMessageSize = 64 * 1024) public class SimpleProducerSocket { - private final static String message = getEncoder().encodeToString("test".getBytes()); - private final static String testJsonString = "{\"content\": \"" + message - + "\", \"properties\" : {\"test\" :\"test\"}}"; private final CountDownLatch closeLatch; private Session session; private ArrayList producerBuffer; @@ -48,24 +48,30 @@ public class SimpleProducerSocket { producerBuffer = new ArrayList(); } + private static String getTestJsonPayload(int index) throws JsonProcessingException { + ProducerMessage msg = new ProducerMessage(); + msg.payload = Base64.getEncoder().encodeToString(("test" + index).getBytes()); + msg.key = Integer.toString(index); + return jsonMapper().writeValueAsString(msg); + } + public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException { return this.closeLatch.await(duration, unit); } @OnWebSocketClose public void onClose(int statusCode, String reason) { - log.info("Connection closed: %d - %s%n", statusCode, reason); + log.info("Connection closed: {} - {}", statusCode, reason); this.session = null; this.closeLatch.countDown(); } @OnWebSocketConnect public void onConnect(Session session) throws InterruptedException, IOException, JSONException { - log.info("Got connect: %s%n", session); + log.info("Got connect: {}", session); this.session = session; - String sampleMsg = new JSONObject(testJsonString).toString(); for (int i = 0; i < 10; i++) { - this.session.getRemote().sendString(sampleMsg); + this.session.getRemote().sendString(getTestJsonPayload(i)); } } diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java index ef90274e691dafab9a0fcd7bbca7150b3a525d5c..960dc367edeef82fdffe54c5827900c3b850cb3c 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.websocket.api.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.yahoo.pulsar.client.api.CompressionType; import com.yahoo.pulsar.client.api.Message; import com.yahoo.pulsar.client.api.MessageBuilder; @@ -35,6 +36,9 @@ import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.util.ObjectMapperFactory; import com.yahoo.pulsar.websocket.data.ProducerAck; import com.yahoo.pulsar.websocket.data.ProducerMessage; +import static java.lang.String.format; +import static com.yahoo.pulsar.websocket.WebSocketError.*; + /** * Websocket end-point url handler to handle incoming message coming from client. Websocket end-point url handler to @@ -68,27 +72,36 @@ public class ProducerHandler extends AbstractWebSocketHandler { ProducerConfiguration conf = getProducerConfiguration(); producer = service.getPulsarClient().createProducer(topic, conf); } catch (Exception e) { - close(WebSocketError.FailedToCreateProducer, e.getMessage()); + close(FailedToCreateProducer, e.getMessage()); } } @Override public void onWebSocketText(String message) { ProducerMessage sendRequest; + byte[] rawPayload = null; + String requestContext = null; try { sendRequest = ObjectMapperFactory.getThreadLocal().readValue(message, ProducerMessage.class); - } catch (IOException e1) { - close(WebSocketError.FailedToSerializeToJSON, e1.getMessage()); + requestContext = sendRequest.context; + rawPayload = Base64.getDecoder().decode(sendRequest.payload); + } catch (IOException e) { + sendAckResponse(new ProducerAck(FailedToDeserializeFromJSON, e.getMessage(), null, null)); + return; + } catch (IllegalArgumentException e) { + String msg = format("Invalid Base64 message-payload error=%s", e.getMessage()); + sendAckResponse(new ProducerAck(PayloadEncodingError, msg, null, requestContext)); return; } - byte[] rawPayload = Base64.getDecoder().decode(sendRequest.payload); + MessageBuilder builder = MessageBuilder.create().setContent(rawPayload); - MessageBuilder builder = MessageBuilder.create().setContent(rawPayload).setProperties(sendRequest.properties); + if (sendRequest.properties != null) { + builder.setProperties(sendRequest.properties); + } if (sendRequest.key != null) { builder.setKey(sendRequest.key); } - if (sendRequest.replicationClusters != null) { builder.setReplicationClusters(sendRequest.replicationClusters); } @@ -97,23 +110,11 @@ public class ProducerHandler extends AbstractWebSocketHandler { producer.sendAsync(msg).thenAccept(msgId -> { if (isConnected()) { String messageId = Base64.getEncoder().encodeToString(msgId.toByteArray()); - ProducerAck response = new ProducerAck("ok", messageId, sendRequest.context); - String jsonResponse; - try { - jsonResponse = ObjectMapperFactory.getThreadLocal().writeValueAsString(response); - getSession().getRemote().sendString(jsonResponse); - } catch (Exception e) { - log.warn("Error send ack response: ", e); - } + sendAckResponse(new ProducerAck(messageId, sendRequest.context)); } }).exceptionally(exception -> { - ProducerAck response = new ProducerAck("send-error: " + exception.getMessage(), null, sendRequest.context); - try { - String jsonResponse = ObjectMapperFactory.getThreadLocal().writeValueAsString(response); - getSession().getRemote().sendString(jsonResponse); - } catch (Exception e) { - log.warn("Error send ack response: ", e); - } + sendAckResponse( + new ProducerAck(UnknownError, exception.getMessage(), null, sendRequest.context)); return null; }); } @@ -122,6 +123,17 @@ public class ProducerHandler extends AbstractWebSocketHandler { return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole); } + private void sendAckResponse(ProducerAck response) { + try { + String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(response); + getSession().getRemote().sendString(msg); + } catch (JsonProcessingException e) { + log.warn("[{}] Failed to generate ack json-response {}", producer.getTopic(), e.getMessage(), e); + } catch (Exception e) { + log.warn("[{}] Failed to send ack {}", producer.getTopic(), e.getMessage(), e); + } + } + private ProducerConfiguration getProducerConfiguration() { ProducerConfiguration conf = new ProducerConfiguration(); diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/WebSocketError.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/WebSocketError.java index 5699dd77b17cd7319948ffcfef110c6bce2c481f..9a29a00f8f9c070f616a45edd2fbd3256df79438 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/WebSocketError.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/WebSocketError.java @@ -28,7 +28,9 @@ public enum WebSocketError { FailedToDeserializeFromJSON(3, "Failed to de-serialize from JSON"), // FailedToSerializeToJSON(4, "Failed to serialize to JSON"), // AuthenticationError(5, "Failed to authenticate client"), // - NotAuthorizedError(6, "Client is not authorized"); // + NotAuthorizedError(6, "Client is not authorized"), // + PayloadEncodingError(7, "Invalid payload encoding"), // + UnknownError(8, "Unknown error"); // private final int code; private final String description; diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/data/ProducerAck.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/data/ProducerAck.java index b055d7e1bf3d4fa28853ecb9cfde8a09dd708172..bd563b8bd02ce164f3927609c2847f439dc9a236 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/data/ProducerAck.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/data/ProducerAck.java @@ -17,16 +17,27 @@ package com.yahoo.pulsar.websocket.data; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import static com.google.common.base.Joiner.on; +import com.yahoo.pulsar.websocket.WebSocketError; @JsonInclude(Include.NON_NULL) public class ProducerAck { public String result; + public String errorMsg; public String messageId; public String context; - public ProducerAck(String result, String messageId, String context) { - this.result = result; + public ProducerAck(String messageId, String context) { + this.result = "ok"; this.messageId = messageId; this.context = context; } + + public ProducerAck(WebSocketError error, String errorMsg, String messageId, String context) { + this.result = on(':').join("send-error", error.getCode()); + this.errorMsg = errorMsg; + this.messageId = messageId; + this.context = context; + } + } diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketServiceStarter.java index d7bfe313baae15a7bc74aa56b52c568c58ab691e..eeae4f8c6e80a2c24d2212a03f17513bfd388a70 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketServiceStarter.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketServiceStarter.java @@ -27,32 +27,30 @@ import com.yahoo.pulsar.websocket.WebSocketProducerServlet; import com.yahoo.pulsar.websocket.WebSocketService; public class WebSocketServiceStarter { + public static void main(String args[]) throws Exception { checkArgument(args.length == 1, "Need to specify a configuration file"); - try { // load config file and start proxy service String configFile = args[0]; log.info("Loading configuration from {}", configFile); ServiceConfiguration config = ServiceConfigurationLoader.create(configFile); ProxyServer proxyServer = new ProxyServer(config); - WebSocketService service = new WebSocketService(config); - - proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, - new WebSocketProducerServlet(service)); - proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, - new WebSocketConsumerServlet(service)); - - proxyServer.start(); - - service.start(); + start(proxyServer, service); } catch (Exception e) { log.error("Failed to start WebSocket service", e); Runtime.getRuntime().halt(1); } } + public static void start(ProxyServer proxyServer, WebSocketService service) throws Exception { + proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); + proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); + proxyServer.start(); + service.start(); + } + private static final Logger log = LoggerFactory.getLogger(WebSocketServiceStarter.class); }