提交 164940f4 编写于 作者: J Jake Wharton

WebSocket WIP

上级 f83564aa
......@@ -18,8 +18,10 @@ package retrofit2;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import javax.annotation.Nullable;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okio.ByteString;
import retrofit2.http.Streaming;
final class BuiltInConverters extends Converter.Factory {
......@@ -46,6 +48,30 @@ final class BuiltInConverters extends Converter.Factory {
return null;
}
@Nullable @Override
public Converter<?, RequestBody> outgoingMessageConverter(Type type, Annotation[] annotations,
Retrofit retrofit) {
if (type == String.class) {
return StringOutgoingMessageConverter.INSTANCE;
}
if (type == ByteString.class) {
return ByteStringOutgoingMessageConverter.INSTANCE;
}
return null;
}
@Nullable @Override
public Converter<ResponseBody, ?> incomingMessageConverter(Type type, Annotation[] annotations,
Retrofit retrofit) {
if (type == String.class) {
return StringIncomingMessageConverter.INSTANCE;
}
if (type == ByteString.class) {
return ByteStringIncomingMessageConverter.INSTANCE;
}
return null;
}
static final class VoidResponseBodyConverter implements Converter<ResponseBody, Void> {
static final VoidResponseBodyConverter INSTANCE = new VoidResponseBodyConverter();
......@@ -93,4 +119,40 @@ final class BuiltInConverters extends Converter.Factory {
return value.toString();
}
}
static final class StringOutgoingMessageConverter implements Converter<String, RequestBody> {
static final StringOutgoingMessageConverter INSTANCE = new StringOutgoingMessageConverter();
@Override public RequestBody convert(String value) {
return RequestBody.create(STRING_MESSAGE, value);
}
}
static final class StringIncomingMessageConverter implements Converter<ResponseBody, String> {
static final StringIncomingMessageConverter INSTANCE = new StringIncomingMessageConverter();
@Override public String convert(ResponseBody value) throws IOException {
return value.string();
}
}
static final class ByteStringOutgoingMessageConverter
implements Converter<ByteString, RequestBody> {
static final ByteStringOutgoingMessageConverter INSTANCE =
new ByteStringOutgoingMessageConverter();
@Override public RequestBody convert(ByteString value) {
return RequestBody.create(BYTESTRING_MESSAGE, value);
}
}
static final class ByteStringIncomingMessageConverter
implements Converter<ResponseBody, ByteString> {
static final ByteStringIncomingMessageConverter INSTANCE =
new ByteStringIncomingMessageConverter();
@Override public ByteString convert(ResponseBody value) throws IOException {
return value.source().readByteString();
}
}
}
......@@ -20,6 +20,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import javax.annotation.Nullable;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import retrofit2.http.Body;
......@@ -39,6 +40,9 @@ import retrofit2.http.QueryMap;
* into the {@link Retrofit} instance.
*/
public interface Converter<F, T> {
MediaType STRING_MESSAGE = MediaType.parse("application/vnd+retrofit.ws+string");
MediaType BYTESTRING_MESSAGE = MediaType.parse("application/vnd+retrofit.ws+bytestring");
T convert(F value) throws IOException;
/** Creates {@link Converter} instances based on a type and target usage. */
......@@ -77,6 +81,16 @@ public interface Converter<F, T> {
return null;
}
public @Nullable Converter<?, RequestBody> outgoingMessageConverter(Type type,
Annotation[] annotations, Retrofit retrofit) {
return null;
}
public @Nullable Converter<ResponseBody, ?> incomingMessageConverter(Type type,
Annotation[] annotations, Retrofit retrofit) {
return null;
}
/**
* Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
* example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
......
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2;
import java.io.IOException;
import javax.annotation.Nullable;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.Buffer;
import retrofit2.internal.WebSocket;
import static retrofit2.Converter.BYTESTRING_MESSAGE;
import static retrofit2.Converter.STRING_MESSAGE;
final class OkHttpWebSocket<OutT> implements WebSocket<OutT> {
private final okhttp3.WebSocket rawWebSocket;
private final Converter<OutT, RequestBody> outConverter;
OkHttpWebSocket(okhttp3.WebSocket rawWebSocket, Converter<OutT, RequestBody> outConverter) {
this.rawWebSocket = rawWebSocket;
this.outConverter = outConverter;
}
@Override public Request request() {
return rawWebSocket.request();
}
@Override public long queueSize() {
return rawWebSocket.queueSize();
}
@Override public boolean send(OutT item) {
Buffer buffer = new Buffer();
MediaType contentType;
try {
RequestBody body = outConverter.convert(item);
body.writeTo(buffer);
contentType = body.contentType();
} catch (IOException e) {
rawWebSocket.cancel();
throw new RuntimeException("Failed to convert: " + item, e);
}
if (STRING_MESSAGE.equals(contentType)) {
return rawWebSocket.send(buffer.readUtf8());
} else if (BYTESTRING_MESSAGE.equals(contentType)) {
return rawWebSocket.send(buffer.readByteString());
}
throw new IllegalStateException("Outgoing message converter "
+ outConverter
+ " returned RequestBody with invalid content type "
+ contentType
+ ". Converter.STRING_MESSAGE or Converter.BYTESTRING_MESSAGE are the only valid values.");
}
@Override public boolean close(int code, @Nullable String reason) {
return rawWebSocket.close(code, reason);
}
@Override public void cancel() {
rawWebSocket.cancel();
}
}
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.ByteString;
import retrofit2.internal.WebSocket;
import retrofit2.internal.WebSocketCall;
import retrofit2.internal.WebSocketListener;
import static retrofit2.Converter.BYTESTRING_MESSAGE;
import static retrofit2.Converter.STRING_MESSAGE;
final class OkHttpWebSocketCall<InT, OutT> implements WebSocketCall<InT, OutT> {
private final okhttp3.WebSocket.Factory rawWebSocketFactory;
private final RequestFactory requestFactory;
private final @Nullable Object[] args;
private final Converter<ResponseBody, InT> inConverter;
private final Converter<OutT, RequestBody> outConverter;
OkHttpWebSocketCall(okhttp3.WebSocket.Factory rawWebSocketFactory, RequestFactory requestFactory,
@Nullable Object[] args, Converter<ResponseBody, InT> inConverter,
Converter<OutT, RequestBody> outConverter) {
this.rawWebSocketFactory = rawWebSocketFactory;
this.requestFactory = requestFactory;
this.args = args;
this.inConverter = inConverter;
this.outConverter = outConverter;
}
@Override
public WebSocket<OutT> connect(final WebSocketListener<InT, OutT> listener) {
Request request;
try {
request = requestFactory.create(args);
} catch (IOException e) {
// TODO call on failure? but we don't have a WebSocket instance to use!
return null;
}
final AtomicReference<WebSocket<OutT>> webSocketRef = new AtomicReference<>();
okhttp3.WebSocket rawWebSocket =
rawWebSocketFactory.newWebSocket(request, new okhttp3.WebSocketListener() {
@Override public void onOpen(okhttp3.WebSocket webSocket, Response response) {
listener.onOpen(webSocketRef.get(), response);
}
@Override public void onMessage(okhttp3.WebSocket webSocket, String text) {
ResponseBody body = ResponseBody.create(STRING_MESSAGE, text);
InT message;
try {
message = inConverter.convert(body);
} catch (IOException e) {
// TODO call on failure?
return;
}
listener.onMessage(webSocketRef.get(), message);
}
@Override public void onMessage(okhttp3.WebSocket webSocket, ByteString bytes) {
// TODO drop toByteArray with OkHttp 3.11: https://github.com/square/okhttp/pull/4115
ResponseBody body = ResponseBody.create(BYTESTRING_MESSAGE, bytes.toByteArray());
InT message;
try {
message = inConverter.convert(body);
} catch (IOException e) {
// TODO call on failure?
return;
}
listener.onMessage(webSocketRef.get(), message);
}
@Override public void onClosing(okhttp3.WebSocket webSocket, int code, String reason) {
listener.onClosing(webSocketRef.get(), code, reason);
}
@Override public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {
listener.onClosed(webSocketRef.get(), code, reason);
}
@Override public void onFailure(okhttp3.WebSocket webSocket, Throwable t,
@Nullable Response response) {
listener.onFailure(webSocketRef.get(), t, response);
}
});
WebSocket<OutT> webSocket = new OkHttpWebSocket<>(rawWebSocket, outConverter);
webSocketRef.set(webSocket);
return webSocket;
}
}
......@@ -61,16 +61,19 @@ public final class Retrofit {
private final Map<Method, ServiceMethod<?>> serviceMethodCache = new ConcurrentHashMap<>();
final okhttp3.Call.Factory callFactory;
final okhttp3.WebSocket.Factory webSocketFactory;
final HttpUrl baseUrl;
final List<Converter.Factory> converterFactories;
final List<CallAdapter.Factory> callAdapterFactories;
final @Nullable Executor callbackExecutor;
final boolean validateEagerly;
Retrofit(okhttp3.Call.Factory callFactory, HttpUrl baseUrl,
List<Converter.Factory> converterFactories, List<CallAdapter.Factory> callAdapterFactories,
@Nullable Executor callbackExecutor, boolean validateEagerly) {
Retrofit(okhttp3.Call.Factory callFactory, okhttp3.WebSocket.Factory webSocketFactory,
HttpUrl baseUrl, List<Converter.Factory> converterFactories,
List<CallAdapter.Factory> callAdapterFactories, @Nullable Executor callbackExecutor,
boolean validateEagerly) {
this.callFactory = callFactory;
this.webSocketFactory = webSocketFactory;
this.baseUrl = baseUrl;
this.converterFactories = converterFactories; // Copy+unmodifiable at call site.
this.callAdapterFactories = callAdapterFactories; // Copy+unmodifiable at call site.
......@@ -370,6 +373,52 @@ public final class Retrofit {
return (Converter<T, String>) BuiltInConverters.ToStringConverter.INSTANCE;
}
public <T> Converter<T, RequestBody> outgoingMessageConverter(Type type,
Annotation[] annotations) {
checkNotNull(type, "type == null");
checkNotNull(annotations, "annotations == null");
for (int i = 0, count = converterFactories.size(); i < count; i++) {
Converter<?, RequestBody> converter =
converterFactories.get(i).outgoingMessageConverter(type, annotations, this);
if (converter != null) {
//noinspection unchecked
return (Converter<T, RequestBody>) converter;
}
}
StringBuilder builder = new StringBuilder("Could not locate outgoing message converter for ")
.append(type)
.append(".\n Tried:");
for (Converter.Factory converterFactory : converterFactories) {
builder.append("\n * ").append(converterFactory.getClass().getName());
}
throw new IllegalArgumentException(builder.toString());
}
public <T> Converter<ResponseBody, T> incomingMessageConverter(Type type,
Annotation[] annotations) {
checkNotNull(type, "type == null");
checkNotNull(annotations, "annotations == null");
for (int i = 0, count = converterFactories.size(); i < count; i++) {
Converter<ResponseBody, ?> converter =
converterFactories.get(i).incomingMessageConverter(type, annotations, this);
if (converter != null) {
//noinspection unchecked
return (Converter<ResponseBody, T>) converter;
}
}
StringBuilder builder = new StringBuilder("Could not locate incoming message converter for ")
.append(type)
.append(".\n Tried:");
for (Converter.Factory converterFactory : converterFactories) {
builder.append("\n * ").append(converterFactory.getClass().getName());
}
throw new IllegalArgumentException(builder.toString());
}
/**
* The executor used for {@link Callback} methods on a {@link Call}. This may be {@code null},
* in which case callbacks should be made synchronously on the background thread.
......@@ -391,6 +440,7 @@ public final class Retrofit {
public static final class Builder {
private final Platform platform;
private @Nullable okhttp3.Call.Factory callFactory;
private @Nullable okhttp3.WebSocket.Factory webSocketFactory;
private HttpUrl baseUrl;
private final List<Converter.Factory> converterFactories = new ArrayList<>();
private final List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>();
......@@ -408,6 +458,7 @@ public final class Retrofit {
Builder(Retrofit retrofit) {
platform = Platform.get();
callFactory = retrofit.callFactory;
webSocketFactory = retrofit.webSocketFactory;
baseUrl = retrofit.baseUrl;
converterFactories.addAll(retrofit.converterFactories);
......@@ -425,10 +476,11 @@ public final class Retrofit {
/**
* The HTTP client used for requests.
* <p>
* This is a convenience method for calling {@link #callFactory}.
* This is a convenience method for calling {@link #callFactory} and {@link #webSocketFactory}.
*/
public Builder client(OkHttpClient client) {
return callFactory(checkNotNull(client, "client == null"));
checkNotNull(client, "client == null");
return callFactory(client).webSocketFactory(client);
}
/**
......@@ -441,6 +493,16 @@ public final class Retrofit {
return this;
}
/**
* Specify a custom call factory for creating {@link Call} instances.
* <p>
* Note: Calling {@link #client} automatically sets this value.
*/
public Builder webSocketFactory(okhttp3.WebSocket.Factory factory) {
this.webSocketFactory = checkNotNull(factory, "factory == null");
return this;
}
/**
* Set the API base URL.
*
......@@ -573,8 +635,15 @@ public final class Retrofit {
}
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
callFactory = new OkHttpClient();
okhttp3.WebSocket.Factory webSocketFactory = this.webSocketFactory;
if (callFactory == null || webSocketFactory == null) {
OkHttpClient client = new OkHttpClient();
if (callFactory == null) {
callFactory = client;
}
if (webSocketFactory == null) {
webSocketFactory = client;
}
}
Executor callbackExecutor = this.callbackExecutor;
......@@ -595,8 +664,9 @@ public final class Retrofit {
converterFactories.add(new BuiltInConverters());
converterFactories.addAll(this.converterFactories);
return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
return new Retrofit(callFactory, webSocketFactory, baseUrl,
unmodifiableList(converterFactories), unmodifiableList(callAdapterFactories),
callbackExecutor, validateEagerly);
}
}
}
......@@ -18,6 +18,7 @@ package retrofit2;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import javax.annotation.Nullable;
import retrofit2.internal.WebSocket;
import static retrofit2.Utils.methodError;
......@@ -32,6 +33,11 @@ abstract class ServiceMethod<T> {
throw methodError(method, "Service methods cannot return void.");
}
if (Utils.getRawType(returnType) == WebSocket.class) {
//noinspection unchecked Return type checked by enclosing conditional.
return (ServiceMethod<T>) new WebSocketServiceMethod.Builder(retrofit, method).build();
}
return new HttpServiceMethod.Builder<Object, T>(retrofit, method).build();
}
......
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import javax.annotation.Nullable;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okhttp3.WebSocket;
import retrofit2.internal.WebSocketCall;
final class WebSocketServiceMethod extends ServiceMethod<WebSocketCall<?, ?>> {
private final okhttp3.WebSocket.Factory rawWebSocketFactory;
private final RequestFactory requestFactory;
private final Converter<ResponseBody, Object> inConverter;
private final Converter<Object, RequestBody> outConverter;
WebSocketServiceMethod(WebSocket.Factory rawWebSocketFactory, RequestFactory requestFactory,
Converter<ResponseBody, Object> inConverter, Converter<Object, RequestBody> outConverter) {
this.rawWebSocketFactory = rawWebSocketFactory;
this.requestFactory = requestFactory;
this.inConverter = inConverter;
this.outConverter = outConverter;
}
@Override WebSocketCall<?, ?> invoke(@Nullable Object[] args) {
return new OkHttpWebSocketCall<>(rawWebSocketFactory, requestFactory, args, inConverter,
outConverter);
}
static final class Builder {
private final Retrofit retrofit;
private final Method method;
Builder(Retrofit retrofit, Method method) {
this.retrofit = retrofit;
this.method = method;
}
ServiceMethod<WebSocketCall<?, ?>> build() {
Type returnType = method.getGenericReturnType();
if (!(returnType instanceof ParameterizedType)) {
throw Utils.methodError(method, ""); // TODO error message
}
ParameterizedType parameterizedReturnType = (ParameterizedType) returnType;
Type inType = Utils.getParameterUpperBound(0, parameterizedReturnType);
Type outType = Utils.getParameterUpperBound(1, parameterizedReturnType);
Annotation[] methodAnnotations = method.getAnnotations();
Converter<ResponseBody, Object> inConverter =
retrofit.incomingMessageConverter(inType, methodAnnotations);
Converter<Object, RequestBody> outConverter =
retrofit.outgoingMessageConverter(outType, methodAnnotations);
RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);
return new WebSocketServiceMethod(retrofit.webSocketFactory, requestFactory, inConverter,
outConverter);
}
}
}
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.internal;
import javax.annotation.Nullable;
import okhttp3.Request;
public interface WebSocket<OutT> {
Request request();
long queueSize();
boolean send(OutT item);
boolean close(int code, @Nullable String reason);
void cancel();
}
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.internal;
public interface WebSocketCall<InT, OutT> {
WebSocket<OutT> connect(WebSocketListener<InT, OutT> listener);
}
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.internal;
import javax.annotation.Nullable;
import okhttp3.Response;
public abstract class WebSocketListener<InT, OutT> {
public void onOpen(WebSocket<OutT> webSocket, Response response) {
}
public void onMessage(WebSocket<OutT> webSocket, InT item) {
}
public void onClosing(WebSocket<OutT> webSocket, int code, String reason) {
}
public void onClosed(WebSocket<OutT> webSocket, int code, String reason) {
}
public void onFailure(WebSocket<OutT> webSocket, Throwable t, @Nullable Response response) {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册