未验证 提交 4be7a2ef 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Polish codes - provide default implementations for IConsumer. (#10087)

上级 d8fce128
......@@ -22,7 +22,6 @@ import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
......@@ -131,11 +130,6 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues
}
}
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<ExportData> data) {
GRPCStreamStatus status = new GRPCStreamStatus();
......@@ -239,11 +233,6 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues
log.error(t.getMessage(), t);
}
@Override
public void onExit() {
}
private boolean eventTypeMatch(ExportEvent.EventType eventType,
org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) {
return (ExportEvent.EventType.INCREMENT.equals(eventType) && EventType.INCREMENT.equals(subscriptionType))
......
......@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.exporter.provider.kafka.log;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -94,11 +93,6 @@ public class KafkaLogExporter extends KafkaExportProducer implements LogExportSe
return setting.isEnableKafkaLog();
}
@Override
public void init(final Properties properties) {
}
@Override
public void consume(final List<LogRecord> data) {
for (LogRecord logRecord : data) {
......@@ -131,11 +125,6 @@ public class KafkaLogExporter extends KafkaExportProducer implements LogExportSe
}
@Override
public void onExit() {
}
private LogData transLogData(LogRecord logRecord) throws InvalidProtocolBufferException {
LogData.Builder builder = LogData.newBuilder();
LogDataBody.Builder bodyBuilder = LogDataBody.newBuilder();
......
......@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.exporter.provider.kafka.trace;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -85,11 +84,6 @@ public class KafkaTraceExporter extends KafkaExportProducer implements TraceExpo
return setting.isEnableKafkaTrace();
}
@Override
public void init(final Properties properties) {
}
@Override
public void consume(final List<SegmentRecord> data) {
for (SegmentRecord segmentRecord : data) {
......@@ -122,9 +116,4 @@ public class KafkaTraceExporter extends KafkaExportProducer implements TraceExpo
public void onError(final List<SegmentRecord> data, final Throwable t) {
}
@Override
public void onExit() {
}
}
......@@ -19,16 +19,15 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
......@@ -130,10 +129,6 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
}
private class AggregatorConsumer implements IConsumer<Metrics> {
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<Metrics> data) {
MetricsAggregateWorker.this.onWork(data);
......@@ -144,10 +139,6 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
log.error(t.getMessage(), t);
}
@Override
public void onExit() {
}
@Override
public void nothingToConsume() {
flush();
......
......@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
......@@ -401,11 +400,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
* ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual.
*/
private class PersistentConsumer implements IConsumer<Metrics> {
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<Metrics> data) {
MetricsPersistentWorker.this.onWork(data);
......@@ -415,9 +409,5 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
public void onError(List<Metrics> data, Throwable t) {
log.error(t.getMessage(), t);
}
@Override
public void onExit() {
}
}
}
......@@ -21,16 +21,15 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
......@@ -97,10 +96,6 @@ public class TopNWorker extends PersistenceWorker<TopN> {
}
private class TopNConsumer implements IConsumer<TopN> {
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<TopN> data) {
TopNWorker.this.onWork(data);
......@@ -110,10 +105,5 @@ public class TopNWorker extends PersistenceWorker<TopN> {
public void onError(List<TopN> data, Throwable t) {
log.error(t.getMessage(), t);
}
@Override
public void onExit() {
}
}
}
......@@ -24,17 +24,16 @@ import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContext;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
......@@ -154,10 +153,6 @@ public class GRPCRemoteClient implements RemoteClient {
}
class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<RemoteMessage> remoteMessages) {
try {
......@@ -177,10 +172,6 @@ public class GRPCRemoteClient implements RemoteClient {
public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
log.error(t.getMessage(), t);
}
@Override
public void onExit() {
}
}
/**
......
......@@ -22,19 +22,20 @@ import java.util.List;
import java.util.Properties;
public interface IConsumer<T> {
void init(final Properties properties);
default void init(final Properties properties) {
}
void consume(List<T> data);
void onError(List<T> data, Throwable t);
void onExit();
default void onExit() {
}
/**
* Notify the implementation, if there is nothing fetched from the queue. This could be used as a timer to trigger
* reaction if the queue has no element.
*/
default void nothingToConsume() {
return;
}
}
......@@ -20,14 +20,12 @@ package org.apache.skywalking.oap.server.library.datacarrier;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.oap.server.library.datacarrier.buffer.Channels;
import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.datacarrier.partition.ProducerThreadPartitioner;
import org.apache.skywalking.oap.server.library.datacarrier.partition.SimpleRollingPartitioner;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
......@@ -119,13 +117,6 @@ public class DataCarrierTest {
e.printStackTrace();
}
IConsumer<SampleData> consumer = new IConsumer<SampleData>() {
int i = 0;
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<SampleData> data) {
......@@ -135,11 +126,6 @@ public class DataCarrierTest {
public void onError(List<SampleData> data, Throwable t) {
}
@Override
public void onExit() {
}
};
carrier.consume(consumer, 1);
}).start();
......
......@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.library.datacarrier.consumer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.SampleData;
......@@ -104,11 +103,6 @@ public class ConsumerTest {
public boolean onError = false;
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<SampleData> data) {
if (onError) {
......@@ -120,11 +114,6 @@ public class ConsumerTest {
public void onError(List<SampleData> data, Throwable t) {
IS_OCCUR_ERROR = true;
}
@Override
public void onExit() {
}
}
private IConsumer getConsumer(DataCarrier<SampleData> carrier) throws IllegalAccessException {
......
......@@ -19,17 +19,11 @@
package org.apache.skywalking.oap.server.library.datacarrier.consumer;
import java.util.List;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.datacarrier.SampleData;
public class SampleConsumer implements IConsumer<SampleData> {
public int i = 1;
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<SampleData> data) {
for (SampleData one : data) {
......@@ -42,9 +36,4 @@ public class SampleConsumer implements IConsumer<SampleData> {
public void onError(List<SampleData> data, Throwable t) {
}
@Override
public void onExit() {
}
}
......@@ -22,19 +22,18 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
......@@ -110,11 +109,6 @@ public class JDBCBatchDAO implements IBatchDAO {
this.h2BatchDAO = h2BatchDAO;
}
@Override
public void init(final Properties properties) {
}
@Override
public void consume(List<PrepareRequest> prepareRequests) {
h2BatchDAO.flush(prepareRequests);
......@@ -124,9 +118,5 @@ public class JDBCBatchDAO implements IBatchDAO {
public void onError(List<PrepareRequest> prepareRequests, Throwable t) {
log.error(t.getMessage(), t);
}
@Override
public void onExit() {
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册