DefaultRSocketRequester.java 10.0 KB
Newer Older
1 2 3 4 5 6 7
/*
 * Copyright 2002-2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
S
Spring Operator 已提交
8
 *      https://www.apache.org/licenses/LICENSE-2.0
9 10 11 12 13 14 15
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
16

17 18 19 20
package org.springframework.messaging.rsocket;

import java.util.Collections;
import java.util.Map;
21
import java.util.function.Consumer;
22 23 24 25 26 27 28 29 30 31 32 33 34

import io.rsocket.Payload;
import io.rsocket.RSocket;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBuffer;
35 36
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
37 38 39 40 41
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;

/**
R
Polish  
Rossen Stoyanchev 已提交
42
 * Default implementation of {@link RSocketRequester}.
43 44 45 46 47 48 49 50 51 52 53 54 55
 *
 * @author Rossen Stoyanchev
 * @since 5.2
 */
final class DefaultRSocketRequester implements RSocketRequester {

	private static final Map<String, Object> EMPTY_HINTS = Collections.emptyMap();


	private final RSocket rsocket;

	private final MimeType dataMimeType;

56 57
	private final MimeType metadataMimeType;

58 59
	private final RSocketStrategies strategies;

60
	private final DataBuffer emptyDataBuffer;
61 62


63 64 65 66
	DefaultRSocketRequester(
			RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
			RSocketStrategies strategies) {

67
		Assert.notNull(rsocket, "RSocket is required");
68 69
		Assert.notNull(dataMimeType, "'dataMimeType' is required");
		Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
70
		Assert.notNull(strategies, "RSocketStrategies is required");
71

72 73
		this.rsocket = rsocket;
		this.dataMimeType = dataMimeType;
74
		this.metadataMimeType = metadataMimeType;
75 76 77 78 79 80 81 82 83 84
		this.strategies = strategies;
		this.emptyDataBuffer = this.strategies.dataBufferFactory().wrap(new byte[0]);
	}


	@Override
	public RSocket rsocket() {
		return this.rsocket;
	}

85 86 87 88 89 90 91 92 93 94
	@Override
	public MimeType dataMimeType() {
		return this.dataMimeType;
	}

	@Override
	public MimeType metadataMimeType() {
		return this.metadataMimeType;
	}

95
	@Override
96
	public RequestSpec route(String route, Object... vars) {
97
		return new DefaultRequestSpec(route, vars);
R
Polish  
Rossen Stoyanchev 已提交
98 99
	}

100 101 102
	@Override
	public RequestSpec metadata(Object metadata, @Nullable MimeType mimeType) {
		return new DefaultRequestSpec(metadata, mimeType);
103 104 105 106
	}


	private static boolean isVoid(ResolvableType elementType) {
107
		return (Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve()));
108 109
	}

110 111 112 113
	private DataBufferFactory bufferFactory() {
		return this.strategies.dataBufferFactory();
	}

114 115 116

	private class DefaultRequestSpec implements RequestSpec {

117
		private final MetadataEncoder metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
118

119
		@Nullable
120
		private Mono<Payload> payloadMono;
121 122

		@Nullable
123
		private Flux<Payload> payloadFlux;
124

125

126 127
		public DefaultRequestSpec(String route, Object... vars) {
			this.metadataEncoder.route(route, vars);
128 129
		}

130 131 132 133 134
		public DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
			this.metadataEncoder.metadata(metadata, mimeType);
		}


135 136
		@Override
		public RequestSpec metadata(Object metadata, MimeType mimeType) {
137
			this.metadataEncoder.metadata(metadata, mimeType);
138
			return this;
139 140
		}

141
		@Override
142
		public RequestSpec metadata(Consumer<MetadataSpec<?>> configurer) {
143 144 145 146
			configurer.accept(this);
			return this;
		}

147
		@Override
148
		public RequestSpec data(Object data) {
149
			Assert.notNull(data, "'data' must not be null");
150 151
			createPayload(data, ResolvableType.NONE);
			return this;
152 153 154
		}

		@Override
155
		public RequestSpec data(Object producer, Class<?> elementClass) {
156
			Assert.notNull(producer, "'producer' must not be null");
157
			Assert.notNull(elementClass, "'elementClass' must not be null");
R
Polish  
Rossen Stoyanchev 已提交
158
			ReactiveAdapter adapter = getAdapter(producer.getClass());
159
			Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
160 161
			createPayload(adapter.toPublisher(producer), ResolvableType.forClass(elementClass));
			return this;
162 163
		}

R
Polish  
Rossen Stoyanchev 已提交
164 165 166 167 168
		@Nullable
		private ReactiveAdapter getAdapter(Class<?> aClass) {
			return strategies.reactiveAdapterRegistry().getAdapter(aClass);
		}

169
		@Override
170
		public RequestSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
171
			Assert.notNull(producer, "'producer' must not be null");
172
			Assert.notNull(elementTypeRef, "'elementTypeRef' must not be null");
R
Polish  
Rossen Stoyanchev 已提交
173
			ReactiveAdapter adapter = getAdapter(producer.getClass());
174
			Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
175 176
			createPayload(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef));
			return this;
177 178
		}

179
		private void createPayload(Object input, ResolvableType elementType) {
R
Polish  
Rossen Stoyanchev 已提交
180
			ReactiveAdapter adapter = getAdapter(input.getClass());
181 182 183 184 185 186 187 188
			Publisher<?> publisher;
			if (input instanceof Publisher) {
				publisher = (Publisher<?>) input;
			}
			else if (adapter != null) {
				publisher = adapter.toPublisher(input);
			}
			else {
189 190
				ResolvableType type = ResolvableType.forInstance(input);
				this.payloadMono = firstPayload(Mono.fromCallable(() -> encodeData(input, type, null)));
191 192
				this.payloadFlux = null;
				return;
193 194
			}

195
			if (isVoid(elementType) || (adapter != null && adapter.isNoValue())) {
196
				this.payloadMono = firstPayload(Mono.when(publisher).then(Mono.just(emptyDataBuffer)));
197 198
				this.payloadFlux = null;
				return;
199 200
			}

201 202
			Encoder<?> encoder = elementType != ResolvableType.NONE && !Object.class.equals(elementType.resolve()) ?
					strategies.encoder(elementType, dataMimeType) : null;
203 204

			if (adapter != null && !adapter.isMultiValue()) {
205
				Mono<DataBuffer> data = Mono.from(publisher)
206
						.map(value -> encodeData(value, elementType, encoder))
207 208
						.defaultIfEmpty(emptyDataBuffer);
				this.payloadMono = firstPayload(data);
209 210
				this.payloadFlux = null;
				return;
211 212
			}

213 214
			this.payloadMono = null;
			this.payloadFlux = Flux.from(publisher)
215
					.map(value -> encodeData(value, elementType, encoder))
216
					.defaultIfEmpty(emptyDataBuffer)
217 218
					.switchOnFirst((signal, inner) -> {
						DataBuffer data = signal.get();
219
						if (data != null) {
220
							return firstPayload(Mono.fromCallable(() -> data))
221
									.concatWith(inner.skip(1).map(PayloadUtils::createPayload));
222 223 224 225
						}
						else {
							return inner.map(PayloadUtils::createPayload);
						}
226
					})
227
					.doOnDiscard(Payload.class, Payload::release);
228 229 230
		}

		@SuppressWarnings("unchecked")
231
		private <T> DataBuffer encodeData(T value, ResolvableType elementType, @Nullable Encoder<?> encoder) {
232
			if (encoder == null) {
233 234
				elementType = ResolvableType.forInstance(value);
				encoder = strategies.encoder(elementType, dataMimeType);
235
			}
236
			return ((Encoder<T>) encoder).encodeValue(
237
					value, bufferFactory(), elementType, dataMimeType, EMPTY_HINTS);
238 239
		}

240 241 242 243 244 245 246 247 248
		/**
		 * Create the 1st request payload with encoded data and metadata.
		 * @param encodedData the encoded payload data; expected to not be empty!
		 */
		private Mono<Payload> firstPayload(Mono<DataBuffer> encodedData) {
			return Mono.zip(encodedData, this.metadataEncoder.encode())
					.map(tuple -> PayloadUtils.createPayload(tuple.getT1(), tuple.getT2()))
					.doOnDiscard(DataBuffer.class, DataBufferUtils::release)
					.doOnDiscard(Payload.class, Payload::release);
249 250 251 252
		}

		@Override
		public Mono<Void> send() {
253 254 255 256 257 258
			return getPayloadMonoRequired().flatMap(rsocket::fireAndForget);
		}

		private Mono<Payload> getPayloadMonoRequired() {
			Assert.state(this.payloadFlux == null, "No RSocket interaction model for Flux request to Mono response.");
			return this.payloadMono != null ? this.payloadMono : firstPayload(Mono.just(emptyDataBuffer));
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
		}

		@Override
		public <T> Mono<T> retrieveMono(Class<T> dataType) {
			return retrieveMono(ResolvableType.forClass(dataType));
		}

		@Override
		public <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef) {
			return retrieveMono(ResolvableType.forType(dataTypeRef));
		}

		@Override
		public <T> Flux<T> retrieveFlux(Class<T> dataType) {
			return retrieveFlux(ResolvableType.forClass(dataType));
		}

		@Override
		public <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef) {
			return retrieveFlux(ResolvableType.forType(dataTypeRef));
		}

		@SuppressWarnings("unchecked")
		private <T> Mono<T> retrieveMono(ResolvableType elementType) {
283
			Mono<Payload> payloadMono = getPayloadMonoRequired().flatMap(rsocket::requestResponse);
284 285 286 287 288 289

			if (isVoid(elementType)) {
				return (Mono<T>) payloadMono.then();
			}

			Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
290 291
			return (Mono<T>) payloadMono.map(this::retainDataAndReleasePayload)
					.map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
292 293 294 295 296 297 298 299 300 301 302 303 304
		}

		@SuppressWarnings("unchecked")
		private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
			Flux<Payload> payloadFlux = this.payloadMono != null ?
					this.payloadMono.flatMapMany(rsocket::requestStream) :
					rsocket.requestChannel(this.payloadFlux);

			if (isVoid(elementType)) {
				return payloadFlux.thenMany(Flux.empty());
			}

			Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
305 306
			return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer ->
					(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
307 308
		}

309
		private DataBuffer retainDataAndReleasePayload(Payload payload) {
310
			return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory());
311 312 313
		}
	}
}