提交 2418ff0a 编写于 作者: A Arjen Poutsma 提交者: GitHub

Merge pull request #126 from violetagg/onwritepossible-state

Make AbstractResponseBodySubscriber.onWritePossible thread-safe
......@@ -152,10 +152,13 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
* UNSUBSCRIBED
* |
* v
* REQUESTED <---> RECEIVED
* | |
* v v
* COMPLETED
* REQUESTED -------------------> RECEIVED
* ^ ^
* | |
* --------- WRITING <-----
* |
* v
* COMPLETED
* </pre>
* Refer to the individual states for more information.
*/
......@@ -206,36 +209,41 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
/**
* State that gets entered after a buffer has been
* {@linkplain Subscriber#onNext(Object) received}. Responds to
* {@code onWritePossible} by writing the current buffer, and if it can be
* written completely, changes state to either {@link #REQUESTED} if the
* subscription has not been completed; or {@link #COMPLETED} if it has.
* {@code onWritePossible} by writing the current buffer and changes
* the state to {@link #WRITING}. If it can be written completely,
* changes the state to either {@link #REQUESTED} if the subscription
* has not been completed; or {@link #COMPLETED} if it has. If it cannot
* be written completely the state will be changed to {@link #RECEIVED}.
*/
RECEIVED {
@Override
void onWritePossible(AbstractResponseBodySubscriber subscriber) {
DataBuffer dataBuffer = subscriber.currentBuffer;
try {
boolean writeCompleted = subscriber.write(dataBuffer);
if (writeCompleted) {
if (dataBuffer instanceof FlushingDataBuffer) {
subscriber.flush();
}
subscriber.releaseBuffer();
boolean subscriptionCompleted = subscriber.subscriptionCompleted;
if (!subscriptionCompleted) {
if (subscriber.changeState(this, REQUESTED)) {
if (subscriber.changeState(this, WRITING)) {
DataBuffer dataBuffer = subscriber.currentBuffer;
try {
boolean writeCompleted = subscriber.write(dataBuffer);
if (writeCompleted) {
if (dataBuffer instanceof FlushingDataBuffer) {
subscriber.flush();
}
subscriber.releaseBuffer();
boolean subscriptionCompleted = subscriber.subscriptionCompleted;
if (!subscriptionCompleted) {
subscriber.changeState(WRITING, REQUESTED);
subscriber.subscription.request(1);
}
}
else {
if (subscriber.changeState(this, COMPLETED)) {
else {
subscriber.changeState(WRITING, COMPLETED);
subscriber.close();
}
}
else {
subscriber.changeState(WRITING, RECEIVED);
}
}
catch (IOException ex) {
subscriber.onError(ex);
}
}
catch (IOException ex) {
subscriber.onError(ex);
}
}
......@@ -244,6 +252,16 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
subscriber.subscriptionCompleted = true;
}
},
/**
* State that gets entered after a writing of the current buffer has been
* {@code onWritePossible started}.
*/
WRITING {
@Override
void onComplete(AbstractResponseBodySubscriber subscriber) {
subscriber.subscriptionCompleted = true;
}
},
/**
* The terminal completed state. Does not respond to any events.
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册