提交 4e39450d 编写于 作者: R Rossen Stoyanchev

Merge branch '5.1.x'

/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -283,19 +283,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
publisher.changeState(SUBSCRIBING, NO_DEMAND);
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
String logPrefix = publisher.getLogPrefix();
if (publisher.completionBeforeDemand) {
rsReadLogger.trace(logPrefix + "Completed before demand");
publisher.state.get().onAllDataRead(publisher);
}
Throwable ex = publisher.errorBeforeDemand;
if (ex != null) {
if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex);
}
publisher.state.get().onError(publisher, ex);
}
handleCompletionOrErrorBeforeDemand(publisher);
}
else {
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
......@@ -306,11 +294,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionBeforeDemand = true;
handleCompletionOrErrorBeforeDemand(publisher);
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
handleCompletionOrErrorBeforeDemand(publisher);
}
private <T> void handleCompletionOrErrorBeforeDemand(AbstractListenerReadPublisher<T> publisher) {
if (publisher.state.get().equals(NO_DEMAND)) {
if (publisher.completionBeforeDemand) {
rsReadLogger.trace(publisher.getLogPrefix() + "Completed before demand");
publisher.state.get().onAllDataRead(publisher);
}
Throwable ex = publisher.errorBeforeDemand;
if (ex != null) {
if (rsReadLogger.isTraceEnabled()) {
String prefix = publisher.getLogPrefix();
rsReadLogger.trace(prefix + "Completed with error before demand: " + ex);
}
publisher.state.get().onError(publisher, ex);
}
}
}
},
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -282,17 +282,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
if (processor.changeState(this, REQUESTED)) {
if (processor.subscriberCompleted) {
if (processor.isFlushPending()) {
// Ensure the final flush
processor.changeState(REQUESTED, FLUSHING);
processor.flushIfPossible();
}
else if (processor.changeState(REQUESTED, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
else {
processor.state.get().onComplete(processor);
}
handleSubscriberCompleted(processor);
}
else {
Assert.state(processor.subscription != null, "No subscription");
......@@ -303,6 +293,24 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
@Override
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
processor.subscriberCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
handleSubscriberCompleted(processor);
}
}
private <T> void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor<T> processor) {
if (processor.isFlushPending()) {
// Ensure the final flush
processor.changeState(State.REQUESTED, State.FLUSHING);
processor.flushIfPossible();
}
else if (processor.changeState(State.REQUESTED, State.COMPLETED)) {
processor.resultPublisher.publishComplete();
}
else {
processor.state.get().onComplete(processor);
}
}
},
......
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -376,6 +376,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
processor.changeStateToComplete(State.REQUESTED);
}
}
},
......@@ -383,6 +387,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
processor.changeStateToComplete(State.REQUESTED);
}
}
},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册