/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.client.mqttv3.internal.wire;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.internal.ClientState;
import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
/**
* An MqttInputStream
lets applications read instances of
* MqttWireMessage
.
*/
public class MqttInputStream extends InputStream {
private final String CLASS_NAME = MqttInputStream.class.getName();
private final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
private ClientState clientState = null;
private DataInputStream in;
private ByteArrayOutputStream bais;
private int remLen;
private int packetLen;
private byte[] packet;
public MqttInputStream(ClientState clientState, InputStream in) {
this.clientState = clientState;
this.in = new DataInputStream(in);
this.bais = new ByteArrayOutputStream();
this.remLen = -1;
}
public int read() throws IOException {
return in.read();
}
public int available() throws IOException {
return in.available();
}
public void close() throws IOException {
in.close();
}
/**
* Reads an MqttWireMessage
from the stream.
* If the message cannot be fully read within the socket read timeout,
* a null message is returned and the method can be called again until
* the message is fully read.
* @return The {@link MqttWireMessage}
* @throws IOException if an exception is thrown when reading from the stream
* @throws MqttException if the message is invalid
*/
public MqttWireMessage readMqttWireMessage() throws IOException, MqttException {
final String methodName ="readMqttWireMessage";
MqttWireMessage message = null;
try {
// read header
if (remLen < 0) {
// Assume we can read the whole header at once.
// The header is very small so it's likely we
// are able to read it fully or not at all.
// This keeps the parser lean since we don't
// need to cope with a partial header.
// Should we lose synch with the stream,
// the keepalive mechanism would kick in
// closing the connection.
bais.reset();
byte first = in.readByte();
clientState.notifyReceivedBytes(1);
byte type = (byte) ((first >>> 4) & 0x0F);
if ((type < MqttWireMessage.MESSAGE_TYPE_CONNECT) ||
(type > MqttWireMessage.MESSAGE_TYPE_DISCONNECT)) {
// Invalid MQTT message type...
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_INVALID_MESSAGE);
}
remLen = MqttWireMessage.readMBI(in).getValue();
bais.write(first);
// bit silly, we decode it then encode it
bais.write(MqttWireMessage.encodeMBI(remLen));
packet = new byte[(int)(bais.size()+remLen)];
packetLen = 0;
}
// read remaining packet
if (remLen >= 0) {
// the remaining packet can be read with timeouts
readFully();
// reset packet parsing state
remLen = -1;
byte[] header = bais.toByteArray();
System.arraycopy(header,0,packet,0, header.length);
message = MqttWireMessage.createWireMessage(packet);
// @TRACE 301= received {0}
log.fine(CLASS_NAME, methodName, "301",new Object[] {message});
}
} catch (SocketTimeoutException e) {
// ignore socket read timeout
}
return message;
}
private void readFully() throws IOException {
int off = bais.size() + (int) packetLen;
int len = (int) (remLen - packetLen);
if (len < 0)
throw new IndexOutOfBoundsException();
int n = 0;
while (n < len) {
int count = -1;
try {
count = in.read(packet, off + n, len - n);
} catch (SocketTimeoutException e) {
// remember the packet read so far
packetLen += n;
throw e;
}
if (count < 0) {
throw new EOFException();
}
clientState.notifyReceivedBytes(count);
n += count;
}
}
}