提交 5f5015bd 编写于 作者: H hrsakai 提交者: Matteo Merli

Added reader endpoint description to WebSocket.md (#729)

* Added reader endpoint description to WebSocket.md

* Fixed sample code in WebSocket.md

* Use on('message',listener) in node.js sample code
上级 dec05d8f
......@@ -50,7 +50,7 @@ $ bin/pulsar-daemon start websocket
## API Reference
Pulsar's WebSocket API offers two endpoints, one for [producing](#producer-endpoint) messages and one for [consuming](#consumer-endpoint) messages.
Pulsar's WebSocket API offers three endpoints for [producing](#producer-endpoint) messages, [consuming](#consumer-endpoint) messages and [reading](#reader-endpoint) messages.
All exchanges via the WebSocket API use JSON.
......@@ -150,6 +150,50 @@ Key | Type | Required? | Explanation
`messageId`| string | yes | Message ID of the processed message
### Reader endpoint
The reader endpoint requires you to specify a {% popover property %}, {% popover cluster %}, {% popover namespace %}, and {% popover topic %} in the URL:
{% endpoint ws://broker-service-url:8080/ws/reader/persistent/:property/:cluster/:namespace/:topic %}
##### Receiving messages
Server will push messages on the WebSocket session:
```json
{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785"
}
```
Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`messageId` | string | yes | Message ID
`payload` | string | yes | Base-64 encoded payload
`publishTime` | string | yes | Publish timestamp
`properties` | key-value pairs | no | Application-defined properties
`key` | string | no | Original routing key set by producer
#### Acknowledging the message
**In WebSocket**, Reader needs to acknowledge the successful processing of the message to
have the Pulsar WebSocket service update the number of pending messages.
If you don't send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.
```json
{
"messageId": "CAAQAw=="
}
```
Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`messageId`| string | yes | Message ID of the processed message
### Error codes
In case of error the server will close the WebSocket session using the
......@@ -218,7 +262,30 @@ Here's an example Python {% popover consumer %} that listens on a Pulsar {% popo
```python
import websocket, base64, json
TOPIC = 'ws://localhost:8080/ws/producer/persistent/sample/standalone/ns1/my-topic'
TOPIC = 'ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub'
ws = websocket.create_connection(TOPIC)
while True:
msg = json.loads(ws.recv())
if not msg: break
print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
```
#### Python reader
Here's an example Python reader that listens on a Pulsar {% popover topic %} and prints the message ID whenever a message arrives:
```python
import websocket, base64, json
TOPIC = 'ws://localhost:8080/ws/reader/persistent/sample/standalone/ns1/my-topic'
ws = websocket.create_connection(TOPIC)
......@@ -276,12 +343,27 @@ Here's an example Node.js {% popover consumer %} that listens on the same topic
```javascript
var WebSocket = require('ws'),
topic = "ws://localhost:8080/ws/producer/persistent/my-property/us-west/my-ns/my-topic1",
topic = "ws://localhost:8080/ws/consumer/persistent/my-property/us-west/my-ns/my-topic1/my-sub",
ws = new WebSocket(topic);
ws.onmessage = function(packet) {
var receiveMsg = JSON.parse(packet.data);
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
};
```
\ No newline at end of file
ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});
```
#### NodeJS reader
```javascript
var WebSocket = require('ws'),
topic = "ws://localhost:8080/ws/reader/persistent/my-property/us-west/my-ns/my-topic1",
ws = new WebSocket(topic);
ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});
```
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册