提交 9ad18b7a 编写于 作者: W Woonduk Kang

[#7737] Add stream idle timeout

上级 b188d867
......@@ -69,6 +69,8 @@ public class GrpcSpanReceiverConfiguration {
private int grpcStreamSchedulerPeriodMillis;
@Value("${collector.receiver.grpc.span.stream.scheduler.recovery.message.count:10}")
private int grpcStreamSchedulerRecoveryMessageCount;
@Value("${collector.receiver.grpc.span.stream.idletimeout:-1}")
private long grpcStreamIdleTimeout;
private ServerOption grpcServerOption;
......@@ -154,6 +156,10 @@ public class GrpcSpanReceiverConfiguration {
return grpcStreamSchedulerRecoveryMessageCount;
}
public long getGrpcStreamIdleTimeout() {
return grpcStreamIdleTimeout;
}
public ServerOption getGrpcServerOption() {
return grpcServerOption;
}
......@@ -174,6 +180,7 @@ public class GrpcSpanReceiverConfiguration {
sb.append(", grpcStreamCallInitRequestCount=").append(grpcStreamCallInitRequestCount);
sb.append(", grpcStreamSchedulerPeriodMillis=").append(grpcStreamSchedulerPeriodMillis);
sb.append(", grpcStreamSchedulerRecoveryMessageCount=").append(grpcStreamSchedulerRecoveryMessageCount);
sb.append(", grpcStreamIdleTimeout=").append(grpcStreamIdleTimeout);
sb.append(", grpcServerOption=").append(grpcServerOption);
sb.append('}');
return sb.toString();
......
......@@ -66,6 +66,9 @@ public class GrpcStatReceiverConfiguration {
@Value("${collector.receiver.grpc.stat.stream.scheduler.recovery.message.count:10}")
private int grpcStreamSchedulerRecoveryMessageCount;
@Value("${collector.receiver.grpc.stat.stream.idletimeout:-1}")
private long grpcStreamIdleTimeout;
private ServerOption grpcServerOption;
......@@ -147,10 +150,14 @@ public class GrpcStatReceiverConfiguration {
return grpcStreamSchedulerPeriodMillis;
}
public int getGrpcStreamSchedulerRecoveryMessageCount() {
public long getGrpcStreamSchedulerRecoveryMessageCount() {
return grpcStreamSchedulerRecoveryMessageCount;
}
public long getGrpcStreamIdleTimeout() {
return grpcStreamIdleTimeout;
}
public ServerOption getGrpcServerOption() {
return grpcServerOption;
}
......
......@@ -35,18 +35,26 @@ public class StreamExecutorServerInterceptorFactory implements FactoryBean<Serve
private final ScheduledExecutorService scheduledExecutorService;
private final int periodMillis;
private final int recoveryMessagesCount;
private final long idleTimeout;
public StreamExecutorServerInterceptorFactory(Executor executor, int initRequestCount, ScheduledExecutorService scheduledExecutorService, int periodMillis, int recoveryMessagesCount) {
public StreamExecutorServerInterceptorFactory(Executor executor,
int initRequestCount,
ScheduledExecutorService scheduledExecutorService,
int periodMillis,
int recoveryMessagesCount,
long idleTimeout) {
this.executor = Objects.requireNonNull(executor, "executor");
this.initRequestCount = initRequestCount;
this.scheduledExecutorService = Objects.requireNonNull(scheduledExecutorService, "scheduledExecutorService");
this.periodMillis = periodMillis;
this.recoveryMessagesCount = recoveryMessagesCount;
this.idleTimeout = idleTimeout;
}
@Override
public ServerInterceptor getObject() throws Exception {
return new StreamExecutorServerInterceptor(this.beanName, this.executor, initRequestCount, this.scheduledExecutorService, this.periodMillis, recoveryMessagesCount);
return new StreamExecutorServerInterceptor(this.beanName, this.executor, initRequestCount,
this.scheduledExecutorService, this.periodMillis, recoveryMessagesCount, this.idleTimeout);
}
@Override
......
......@@ -163,6 +163,7 @@
<constructor-arg index="2" ref="grpcSpanStreamScheduler"/>
<constructor-arg index="3" value="#{grpcSpanReceiverConfig.grpcStreamSchedulerPeriodMillis}"/>
<constructor-arg index="4" value="#{grpcSpanReceiverConfig.grpcStreamSchedulerRecoveryMessageCount}"/>
<constructor-arg index="5" value="#{grpcSpanReceiverConfig.grpcStreamIdleTimeout}"/>
</bean>
<bean id="spanServiceFactory" class="com.navercorp.pinpoint.collector.receiver.grpc.service.SpanServiceFactory">
<property name="dispatchHandler" ref="grpcSpanDispatchHandlerFactoryBean"/>
......@@ -217,6 +218,7 @@
<constructor-arg index="2" ref="grpcStatStreamScheduler"/>
<constructor-arg index="3" value="#{grpcStatReceiverConfig.grpcStreamSchedulerPeriodMillis}"/>
<constructor-arg index="4" value="#{grpcStatReceiverConfig.grpcStreamSchedulerRecoveryMessageCount}"/>
<constructor-arg index="5" value="#{grpcStatReceiverConfig.grpcStreamIdleTimeout}"/>
</bean>
<bean id="statServiceFactory" class="com.navercorp.pinpoint.collector.receiver.grpc.service.StatServiceFactory">
<property name="dispatchHandler" ref="grpcStatDispatchHandlerFactoryBean"/>
......
......@@ -50,4 +50,5 @@ collector.receiver.grpc.span.stream.scheduler.thread.size=1
collector.receiver.grpc.span.stream.scheduler.period.millis=1000
collector.receiver.grpc.span.stream.call.init.request.count=100
collector.receiver.grpc.span.stream.scheduler.recovery.message.count=100
collector.receiver.grpc.span.stream.idletimeout=-1
......@@ -57,5 +57,6 @@ collector.receiver.grpc.span.stream.scheduler.thread.size=1
collector.receiver.grpc.span.stream.scheduler.period.millis=1000
collector.receiver.grpc.span.stream.call.init.request.count=100
collector.receiver.grpc.span.stream.scheduler.recovery.message.count=100
collector.receiver.grpc.span.stream.idletimeout=-1
......@@ -96,7 +96,8 @@ public class SpanServerTestMain {
}
private ServerServiceDefinition newSpanBindableService(Executor executor) throws Exception {
FactoryBean<ServerInterceptor> interceptorFactory = new StreamExecutorServerInterceptorFactory(executor, 100, Executors.newSingleThreadScheduledExecutor(), 1000, 100);
FactoryBean<ServerInterceptor> interceptorFactory = new StreamExecutorServerInterceptorFactory(executor, 100,
Executors.newSingleThreadScheduledExecutor(), 1000, 100, -1);
((StreamExecutorServerInterceptorFactory) interceptorFactory).setBeanName("SpanService");
ServerInterceptor interceptor = interceptorFactory.getObject();
......
......@@ -62,7 +62,10 @@ public class StatServerTestMain {
}
private ServerServiceDefinition newStatBindableService(Executor executor) throws Exception {
FactoryBean<ServerInterceptor> interceptorFactory = new StreamExecutorServerInterceptorFactory(executor, 100, Executors.newSingleThreadScheduledExecutor(), 1000, 10);
FactoryBean<ServerInterceptor> interceptorFactory = new StreamExecutorServerInterceptorFactory(executor,
100, Executors.newSingleThreadScheduledExecutor(),
1000, 10,
-1);
ServerInterceptor interceptor = interceptorFactory.getObject();
StatService statService = new StatService(new MockDispatchHandler(), new DefaultServerRequestFactory());
return ServerInterceptors.intercept(statService, interceptor);
......
......@@ -23,6 +23,7 @@ import com.navercorp.pinpoint.collector.util.ObjectPool;
import com.navercorp.pinpoint.collector.util.ObjectPoolFactory;
import com.navercorp.pinpoint.common.util.IOUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
......@@ -33,7 +34,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
......@@ -52,24 +52,29 @@ public class UDPReceiverTest {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String ADDRESS = "127.0.0.1";
private static final int PORT = SocketUtils.findAvailableUdpPort(10999);
private int port;
private final PacketHandler loggingPacketHandler = new PacketHandler() {
private final PacketHandler<DatagramPacket> loggingPacketHandler = new PacketHandler<DatagramPacket>() {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void receive(DatagramSocket localSocket, Object packet) {
public void receive(DatagramSocket localSocket, DatagramPacket packet) {
logger.info("receive localSocket:{} packet:{}", localSocket, packet);
}
};
@Before
public void setUp() throws Exception {
this.port = SocketUtils.findAvailableUdpPort(10999);
}
@Test
public void startStop() {
UDPReceiver receiver = null;
InetSocketAddress bindAddress = new InetSocketAddress(ADDRESS, PORT);
InetSocketAddress bindAddress = new InetSocketAddress(ADDRESS, port);
Executor executor = MoreExecutors.directExecutor();
PacketHandlerFactory packetHandlerFactory = mock(PacketHandlerFactory.class);
PacketHandlerFactory<DatagramPacket> packetHandlerFactory = mock(PacketHandlerFactory.class);
when(packetHandlerFactory.createPacketHandler()).thenReturn(loggingPacketHandler);
try {
ObjectPoolFactory<DatagramPacket> packetFactory = new DatagramPacketFactory();
......@@ -86,12 +91,6 @@ public class UDPReceiverTest {
}
}
@Test
public void hostNullCheck() {
InetSocketAddress address = new InetSocketAddress((InetAddress) null, PORT);
logger.debug(address.toString());
}
@Test
public void socketBufferSize() throws SocketException {
DatagramSocket datagramSocket = new DatagramSocket();
......@@ -109,52 +108,53 @@ public class UDPReceiverTest {
DatagramPacket datagramPacket = new DatagramPacket(new byte[0], 0, 0);
DatagramSocket datagramSocket = new DatagramSocket();
datagramSocket.connect(new InetSocketAddress(ADDRESS, PORT));
datagramSocket.connect(new InetSocketAddress(ADDRESS, port));
datagramSocket.send(datagramPacket);
datagramSocket.close();
}
private final AtomicInteger zeroPacketCounter = new AtomicInteger();
void interceptValidatePacket(DatagramPacket packet) {
if (packet.getLength() == 0) {
zeroPacketCounter.incrementAndGet();
}
}
@Test
public void datagramPacket_length_zero() {
UDPReceiver receiver = null;
DatagramSocket datagramSocket = null;
CountDownLatch zeroLengthLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
Executor mockExecutor = mockDispatchWorker(latch);
PacketHandlerFactory packetHandlerFactory = mock(PacketHandlerFactory.class);
PacketHandlerFactory<DatagramPacket> packetHandlerFactory = mock(PacketHandlerFactory.class);
when(packetHandlerFactory.createPacketHandler()).thenReturn(loggingPacketHandler);
final AtomicInteger zeroPacketCounter = new AtomicInteger();
try {
InetSocketAddress bindAddress = new InetSocketAddress(ADDRESS, PORT);
InetSocketAddress bindAddress = new InetSocketAddress(ADDRESS, port);
ObjectPoolFactory<DatagramPacket> packetFactory = new DatagramPacketFactory();
ObjectPool<DatagramPacket> pool = new DefaultObjectPool<>(packetFactory, 10);
ReusePortSocketOptionApplier socketOptionApplier = ReusePortSocketOptionApplier.create(false, 1);
receiver = new UDPReceiver("test", packetHandlerFactory, mockExecutor, 8, bindAddress, socketOptionApplier, pool) {
@Override
boolean validatePacket(DatagramPacket packet) {
interceptValidatePacket(packet);
if (packet.getLength() == 0) {
zeroLengthLatch.countDown();
zeroPacketCounter.incrementAndGet();
}
return super.validatePacket(packet);
}
};
receiver.start();
datagramSocket = new DatagramSocket();
datagramSocket.connect(new InetSocketAddress(ADDRESS, PORT));
InetSocketAddress addr = new InetSocketAddress(ADDRESS, port);
datagramSocket.send(new DatagramPacket(new byte[0], 0));
datagramSocket.send(new DatagramPacket(new byte[1], 1));
datagramSocket.send(new DatagramPacket(new byte[0], 0, addr));
awaitLatch(zeroLengthLatch);
Assert.assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
Assert.assertEquals(zeroPacketCounter.get(), 1);
datagramSocket.send(new DatagramPacket(new byte[1], 1, addr));
Assert.assertTrue(awaitLatch(latch));
Assert.assertEquals(1, zeroPacketCounter.get());
Mockito.verify(mockExecutor).execute(any(Runnable.class));
} catch (Exception e) {
logger.debug(e.getMessage(), e);
......@@ -167,6 +167,14 @@ public class UDPReceiverTest {
}
}
private boolean awaitLatch(CountDownLatch zeroLengthLatch) {
try {
return zeroLengthLatch.await(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private Executor mockDispatchWorker(CountDownLatch latch) {
Executor mockWorker = new Executor() {
......
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.common.util.Clock;
import com.navercorp.pinpoint.common.util.SystemClock;
import java.util.Objects;
public class DefaultIdleTimeout implements IdleTimeout {
private final long idleTimeout;
private volatile long lastExecutionTime = Long.MAX_VALUE;
private volatile boolean expired = false;
private final Clock clock;
public DefaultIdleTimeout(long idleTimeout) {
this(idleTimeout, SystemClock.INSTANCE);
}
public DefaultIdleTimeout(long idleTimeout, Clock clock) {
Assert.isTrue(idleTimeout >= 0, "negative idleTimeout");
this.idleTimeout = idleTimeout;
this.clock = Objects.requireNonNull(clock, "clock");
update();
}
@Override
public void update() {
this.lastExecutionTime = clock.getTime();
}
@Override
public boolean isExpired() {
if (this.expired) {
return true;
}
final long elapsedTime = this.clock.getTime() - lastExecutionTime;
final boolean result = elapsedTime >= idleTimeout;
if (result) {
this.expired = true;
}
return result;
}
}
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.Status;
import java.util.Objects;
public class DefaultServerCallWrapper<ReqT, RespT> implements ServerCallWrapper {
private final ServerCall<ReqT, RespT> serverCall;
private final String agentId;
private final String applicationName;
public DefaultServerCallWrapper(ServerCall<ReqT, RespT> serverCall, String applicationName, String agentId) {
this.serverCall = Objects.requireNonNull(serverCall, "serverCall");
this.applicationName = Objects.requireNonNull(applicationName, "applicationName");
this.agentId = Objects.requireNonNull(agentId, "agentId");
}
@Override
public void request(int numMessages) {
this.serverCall.request(numMessages);
}
@Override
public String getAgentId() {
return agentId;
}
@Override
public String getApplicationName() {
return applicationName;
}
@Override
public void cancel(Status status, Metadata trailers) {
this.serverCall.close(status, new Metadata());
}
@Override
public String toString() {
return "DefaultServerCallWrapper{" +
"serverCall=" + serverCall +
", agentId='" + agentId + '\'' +
", applicationName='" + applicationName + '\'' +
'}';
}
}
package com.navercorp.pinpoint.grpc.server.flowcontrol;
public class DisableIdleTimeout implements IdleTimeout {
public static final long DISABLE_TIME = -1;
@Override
public void update() {
}
@Override
public boolean isExpired() {
return false;
}
}
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import io.grpc.Metadata;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
public class ControlFlowRejectExecutionListener implements RejectedExecutionListener {
public class FlowControlRejectExecutionListener implements RejectedExecutionListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final Status STREAM_IDLE_TIMEOUT = Status.DEADLINE_EXCEEDED.withDescription("Stream idle timeout");
private final AtomicLong rejectedExecutionCounter = new AtomicLong(0);
private final StreamExecutorRejectedExecutionRequestScheduler.ServerCallWrapper serverCall;
private final ServerCallWrapper serverCall;
private final long recoveryMessagesCount;
public ControlFlowRejectExecutionListener(StreamExecutorRejectedExecutionRequestScheduler.ServerCallWrapper serverCall, long recoveryMessagesCount) {
private final IdleTimeout idleTimeout;
public FlowControlRejectExecutionListener(ServerCallWrapper serverCall, long recoveryMessagesCount, IdleTimeout idleTimeout) {
this.serverCall = Objects.requireNonNull(serverCall, "serverCall");
this.recoveryMessagesCount = recoveryMessagesCount;
this.idleTimeout = Objects.requireNonNull(idleTimeout, "idleTimeout");
}
@Override
......@@ -34,8 +46,19 @@ public class ControlFlowRejectExecutionListener implements RejectedExecutionList
}
@Override
public void onExecute() {
public void onMessage() {
this.idleTimeout.update();
}
@Override
public boolean idleTimeExpired() {
return this.idleTimeout.isExpired();
}
@Override
public void idleTimeout() {
logger.info("stream idle timeout applicationName:{} agentId:{}", serverCall.getApplicationName(), serverCall.getAgentId());
serverCall.cancel(STREAM_IDLE_TIMEOUT, new Metadata());
}
@Override
......
package com.navercorp.pinpoint.grpc.server.flowcontrol;
public interface IdleTimeout {
void update();
boolean isExpired();
}
......@@ -7,5 +7,9 @@ public interface RejectedExecutionListener {
long getRejectedExecutionCount();
void onExecute();
void onMessage();
void idleTimeout();
boolean idleTimeExpired();
}
package com.navercorp.pinpoint.grpc.server.flowcontrol;
public class RejectedExecutionListenerFactory {
private static final int REQUEST_IMMEDIATELY = -1;
private final long recoveryMessagesCount;
private final long idleTimeout;
public RejectedExecutionListenerFactory(long recoveryMessagesCount, long idleTimeout) {
this.recoveryMessagesCount = recoveryMessagesCount;
this.idleTimeout = idleTimeout;
}
public RejectedExecutionListener newListener(ServerCallWrapper serverCall) {
IdleTimeout idleTimeout = newIdleTimeout();
if (recoveryMessagesCount == REQUEST_IMMEDIATELY) {
return new SimpleRejectedExecutionListener(serverCall, idleTimeout);
} else {
return new FlowControlRejectExecutionListener(serverCall, recoveryMessagesCount, idleTimeout);
}
}
private IdleTimeout newIdleTimeout() {
if (this.idleTimeout == DisableIdleTimeout.DISABLE_TIME) {
return new DisableIdleTimeout();
} else {
return new DefaultIdleTimeout(this.idleTimeout);
}
}
}
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import io.grpc.Metadata;
import io.grpc.Status;
public interface ServerCallWrapper {
String getAgentId();
String getApplicationName();
void request(int numMessages);
void cancel(Status status, Metadata trailers);
}
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import io.grpc.Metadata;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
public class SimpleRejectedExecutionListener implements RejectedExecutionListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final Status STREAM_IDLE_TIMEOUT = Status.DEADLINE_EXCEEDED.withDescription("Stream idle timeout");
private final StreamExecutorRejectedExecutionRequestScheduler.ServerCallWrapper serverCall;
private final ServerCallWrapper serverCall;
private final IdleTimeout idleTimeout;
public SimpleRejectedExecutionListener(StreamExecutorRejectedExecutionRequestScheduler.ServerCallWrapper serverCall) {
public SimpleRejectedExecutionListener(ServerCallWrapper serverCall, IdleTimeout idleTimeout) {
this.serverCall = Objects.requireNonNull(serverCall, "serverCall");
this.idleTimeout = Objects.requireNonNull(idleTimeout, "idleTimeout");
}
@Override
......@@ -27,8 +37,19 @@ public class SimpleRejectedExecutionListener implements RejectedExecutionListene
}
@Override
public void onExecute() {
// empty
public void onMessage() {
this.idleTimeout.update();
}
@Override
public boolean idleTimeExpired() {
return this.idleTimeout.isExpired();
}
@Override
public void idleTimeout() {
logger.info("stream idle timeout applicationName:{} agentId:{}", serverCall.getApplicationName(), serverCall.getAgentId());
serverCall.cancel(STREAM_IDLE_TIMEOUT, new Metadata());
}
@Override
......
......@@ -17,7 +17,8 @@
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import com.navercorp.pinpoint.common.annotations.VisibleForTesting;
import io.grpc.ServerCall;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
......@@ -28,34 +29,31 @@ import java.util.concurrent.TimeUnit;
* @author jaehong.kim
*/
public class StreamExecutorRejectedExecutionRequestScheduler {
private static final int REQUEST_IMMEDIATELY = -1;
private final int periodMillis;
// private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final ScheduledExecutorService scheduledExecutorService;
private final long recoveryMessagesCount;
private final RejectedExecutionListenerFactory rejectedExecutionListenerFactory;
private final int periodMillis;
public StreamExecutorRejectedExecutionRequestScheduler(final ScheduledExecutorService scheduledExecutorService, final int periodMillis,
final RejectedExecutionListenerFactory rejectedExecutionListenerFactory) {
public StreamExecutorRejectedExecutionRequestScheduler(final ScheduledExecutorService scheduledExecutorService, final int periodMillis, int recoveryMessagesCount) {
this.scheduledExecutorService = Objects.requireNonNull(scheduledExecutorService, "scheduledExecutorService");
this.periodMillis = periodMillis;
// cast long
this.recoveryMessagesCount = recoveryMessagesCount;
}
public <ReqT, RespT> Listener schedule(final ServerCall<ReqT, RespT> call) {
final ServerCallWrapper serverCall = new DefaultServerCallWrapper<>(call);
this.rejectedExecutionListenerFactory = Objects.requireNonNull(rejectedExecutionListenerFactory, "rejectedExecutionListenerFactory");
final RejectedExecutionListener rejectedExecutionListener = newRejectedExecutionListener(serverCall);
final RequestScheduleJob command = new RequestScheduleJob(rejectedExecutionListener);
final ScheduledFuture<?> requestScheduledFuture = scheduledExecutorService.scheduleAtFixedRate(command, periodMillis, periodMillis, TimeUnit.MILLISECONDS);
final Listener listener = new Listener(rejectedExecutionListener, requestScheduledFuture);
return listener;
}
private RejectedExecutionListener newRejectedExecutionListener(ServerCallWrapper serverCall) {
if (recoveryMessagesCount == REQUEST_IMMEDIATELY) {
return new SimpleRejectedExecutionListener(serverCall);
} else {
return new ControlFlowRejectExecutionListener(serverCall, recoveryMessagesCount);
}
public Listener schedule(final ServerCallWrapper serverCall) {
final RejectedExecutionListener rejectedExecutionListener = rejectedExecutionListenerFactory.newListener(serverCall);
final RequestScheduleJob command = new RequestScheduleJob(rejectedExecutionListener);
final ScheduledFuture<?> future = scheduledExecutorService.scheduleAtFixedRate(command, periodMillis, periodMillis, TimeUnit.MILLISECONDS);
command.setFuture(future);
final Listener listener = new Listener(rejectedExecutionListener, future);
return listener;
}
@Override
......@@ -70,6 +68,7 @@ public class StreamExecutorRejectedExecutionRequestScheduler {
@VisibleForTesting
static class RequestScheduleJob implements Runnable {
private final RejectedExecutionListener listener;
private volatile ScheduledFuture<?> future;
public RequestScheduleJob(final RejectedExecutionListener listener) {
this.listener = Objects.requireNonNull(listener, "listener");
......@@ -77,7 +76,30 @@ public class StreamExecutorRejectedExecutionRequestScheduler {
@Override
public void run() {
listener.onSchedule();
if (!expireIdleTimeout()) {
listener.onSchedule();
}
}
private boolean expireIdleTimeout() {
if (listener.idleTimeExpired()) {
if (cancel(this.future)) {
listener.idleTimeout();
return true;
}
}
return false;
}
private boolean cancel(ScheduledFuture<?> future) {
if (future == null) {
return false;
}
return future.cancel(false);
}
public void setFuture(ScheduledFuture<?> future) {
this.future = Objects.requireNonNull(future, "future");
}
}
......@@ -106,8 +128,8 @@ public class StreamExecutorRejectedExecutionRequestScheduler {
return this.requestScheduledFuture.isCancelled();
}
public void onExecute() {
this.rejectedExecutionListener.onExecute();
public void onMessage() {
this.rejectedExecutionListener.onMessage();
}
@Override
......@@ -120,28 +142,4 @@ public class StreamExecutorRejectedExecutionRequestScheduler {
}
}
public interface ServerCallWrapper {
void request(int numMessages);
}
private static class DefaultServerCallWrapper<ReqT, RespT> implements ServerCallWrapper {
private final ServerCall<ReqT, RespT> serverCall;
public DefaultServerCallWrapper(ServerCall<ReqT, RespT> serverCall) {
this.serverCall = Objects.requireNonNull(serverCall, "serverCall");
}
@Override
public void request(int numMessages) {
serverCall.request(numMessages);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DefaultServerCallWrapper{");
sb.append("serverCall=").append(serverCall);
sb.append('}');
return sb.toString();
}
}
}
\ No newline at end of file
......@@ -17,6 +17,7 @@
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.grpc.Header;
import io.grpc.Context;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
......@@ -42,7 +43,7 @@ public class StreamExecutorServerInterceptor implements ServerInterceptor {
private final StreamExecutorRejectedExecutionRequestScheduler scheduler;
public StreamExecutorServerInterceptor(String name, final Executor executor, final int initNumMessages, final ScheduledExecutorService scheduledExecutorService,
final int periodMillis, int recoveryMessagesCount) {
final int periodMillis, int recoveryMessagesCount, long idleTimeout) {
this.name = Objects.requireNonNull(name, "name");
Objects.requireNonNull(executor, "executor");
......@@ -52,12 +53,17 @@ public class StreamExecutorServerInterceptor implements ServerInterceptor {
this.initNumMessages = initNumMessages;
Objects.requireNonNull(scheduledExecutorService, "scheduledExecutorService");
Assert.isTrue(periodMillis > 0, "periodMillis must be positive");
this.scheduler = new StreamExecutorRejectedExecutionRequestScheduler(scheduledExecutorService, periodMillis, recoveryMessagesCount);
RejectedExecutionListenerFactory listenerFactory = new RejectedExecutionListenerFactory(recoveryMessagesCount, idleTimeout);
this.scheduler = new StreamExecutorRejectedExecutionRequestScheduler(scheduledExecutorService, periodMillis, listenerFactory);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
final StreamExecutorRejectedExecutionRequestScheduler.Listener scheduleListener = this.scheduler.schedule(call);
final ServerCallWrapper serverCall = newServerCallWrapper(call, headers);
final StreamExecutorRejectedExecutionRequestScheduler.Listener scheduleListener = this.scheduler.schedule(serverCall);
if (logger.isInfoEnabled()) {
logger.info("Initialize schedule listener. {} {}, headers={}, initNumMessages={}, scheduler={}, listener={}",
this.name, call.getMethodDescriptor().getFullMethodName(), headers, initNumMessages, scheduler, scheduleListener);
......@@ -74,9 +80,11 @@ public class StreamExecutorServerInterceptor implements ServerInterceptor {
executor.execute(new Runnable() {
@Override
public void run() {
scheduleListener.onMessage();
delegate().onMessage(message);
}
});
// scheduleListener.onMessage();
} catch (RejectedExecutionException ree) {
// Defense code, need log ?
scheduleListener.onRejectedExecution();
......@@ -97,4 +105,10 @@ public class StreamExecutorServerInterceptor implements ServerInterceptor {
}
};
}
private <ReqT, RespT> ServerCallWrapper newServerCallWrapper(ServerCall<ReqT, RespT> call, Metadata headers) {
final String agentId = headers.get(Header.AGENT_ID_KEY);
final String applicationName = headers.get(Header.APPLICATION_NAME_KEY);
return new DefaultServerCallWrapper<>(call, applicationName, agentId);
}
}
\ No newline at end of file
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import com.navercorp.pinpoint.common.util.Clock;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DefaultIdleTimeoutTest {
@Test(expected = IllegalArgumentException.class)
public void isExpired_invalid_parameter() {
new DefaultIdleTimeout(-1);
}
@Test
public void isExpired_init_state() {
IdleTimeout idleTimeout = new DefaultIdleTimeout(5000);
Assert.assertFalse(idleTimeout.isExpired());
Assert.assertFalse(idleTimeout.isExpired());
}
@Test
public void isExpired_expired() {
Clock clock = mock(Clock.class);
when(clock.getTime()).thenReturn(1L);
IdleTimeout idleTimeout = new DefaultIdleTimeout(0, clock);
Assert.assertTrue(idleTimeout.isExpired());
Assert.assertTrue(idleTimeout.isExpired());
}
@Test
public void isExpired_update() {
final Clock clock = mock(Clock.class);
when(clock.getTime()).thenReturn(0L);
IdleTimeout idleTimeout = new DefaultIdleTimeout(2, clock);
Assert.assertFalse(idleTimeout.isExpired());
when(clock.getTime()).thenReturn(5L);
Assert.assertTrue(idleTimeout.isExpired());
when(clock.getTime()).thenReturn(0L);
Assert.assertTrue(idleTimeout.isExpired());
}
}
\ No newline at end of file
......@@ -14,9 +14,8 @@
* limitations under the License.
*/
package com.navercorp.pinpoint.grpc.server;
package com.navercorp.pinpoint.grpc.server.flowcontrol;
import com.navercorp.pinpoint.grpc.server.flowcontrol.StreamExecutorRejectedExecutionRequestScheduler;
import io.grpc.internal.NoopServerCall;
import org.junit.After;
import org.junit.Before;
......@@ -24,7 +23,6 @@ import org.junit.Test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
......@@ -48,9 +46,10 @@ public class StreamExecutorRejectedExecutionRequestSchedulerTest {
@Test
public void schedule() {
StreamExecutorRejectedExecutionRequestScheduler scheduler = new StreamExecutorRejectedExecutionRequestScheduler(scheduledExecutorService, 1000, 10);
StreamExecutorRejectedExecutionRequestScheduler.Listener listener = scheduler.schedule(new NoopServerCall());
RejectedExecutionListenerFactory listenerFactory = new RejectedExecutionListenerFactory(10, 5000);
StreamExecutorRejectedExecutionRequestScheduler scheduler = new StreamExecutorRejectedExecutionRequestScheduler(scheduledExecutorService, 1000, listenerFactory);
ServerCallWrapper serverCallWrapper = new DefaultServerCallWrapper<>(new NoopServerCall<>(), "app", "agent");
StreamExecutorRejectedExecutionRequestScheduler.Listener listener = scheduler.schedule(serverCallWrapper);
assertEquals(0, listener.getRejectedExecutionCount());
listener.onRejectedExecution();
assertEquals(1, listener.getRejectedExecutionCount());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册