提交 839aeb80 编写于 作者: W Woonduk Kang

[#7659] Cleanup BulkOperation

上级 b28a2149
...@@ -20,6 +20,7 @@ import com.navercorp.pinpoint.collector.dao.MapResponseTimeDao; ...@@ -20,6 +20,7 @@ import com.navercorp.pinpoint.collector.dao.MapResponseTimeDao;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey; import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName; import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ResponseColumnName; import com.navercorp.pinpoint.collector.dao.hbase.statistics.ResponseColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey; import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils; import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils;
...@@ -52,10 +53,13 @@ public class HbaseMapResponseTimeDao implements MapResponseTimeDao { ...@@ -52,10 +53,13 @@ public class HbaseMapResponseTimeDao implements MapResponseTimeDao {
private final TimeSlot timeSlot; private final TimeSlot timeSlot;
private final BulkWriter bulkWriter; private final BulkWriter bulkWriter;
private final MapLinkConfiguration mapLinkConfiguration;
@Autowired @Autowired
public HbaseMapResponseTimeDao(AcceptedTimeService acceptedTimeService, TimeSlot timeSlot, public HbaseMapResponseTimeDao(MapLinkConfiguration mapLinkConfiguration,
AcceptedTimeService acceptedTimeService, TimeSlot timeSlot,
@Qualifier("selfBulkWriter") BulkWriter bulkWriter) { @Qualifier("selfBulkWriter") BulkWriter bulkWriter) {
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");
this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWrtier"); this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWrtier");
...@@ -80,12 +84,17 @@ public class HbaseMapResponseTimeDao implements MapResponseTimeDao { ...@@ -80,12 +84,17 @@ public class HbaseMapResponseTimeDao implements MapResponseTimeDao {
final ColumnName selfColumnName = new ResponseColumnName(agentId, slotNumber); final ColumnName selfColumnName = new ResponseColumnName(agentId, slotNumber);
HistogramSchema histogramSchema = applicationServiceType.getHistogramSchema(); HistogramSchema histogramSchema = applicationServiceType.getHistogramSchema();
final ColumnName sumColumnName = new ResponseColumnName(agentId, histogramSchema.getSumStatSlot().getSlotTime());
final ColumnName maxColumnName = new ResponseColumnName(agentId, histogramSchema.getMaxStatSlot().getSlotTime());
final ColumnName maxColumnName = new ResponseColumnName(agentId, histogramSchema.getMaxStatSlot().getSlotTime());
this.bulkWriter.increment(selfRowKey, selfColumnName); this.bulkWriter.increment(selfRowKey, selfColumnName);
this.bulkWriter.increment(selfRowKey, sumColumnName, elapsed);
this.bulkWriter.updateMax(selfRowKey, maxColumnName, elapsed); if (mapLinkConfiguration.isEnableAvg()) {
final ColumnName sumColumnName = new ResponseColumnName(agentId, histogramSchema.getSumStatSlot().getSlotTime());
this.bulkWriter.increment(selfRowKey, sumColumnName, elapsed);
}
if (mapLinkConfiguration.isEnableMax()) {
this.bulkWriter.updateMax(selfRowKey, maxColumnName, elapsed);
}
} }
......
...@@ -21,6 +21,7 @@ import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; ...@@ -21,6 +21,7 @@ import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey; import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallerColumnName; import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallerColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName; import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey; import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils; import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
...@@ -54,12 +55,15 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { ...@@ -54,12 +55,15 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao {
private final IgnoreStatFilter ignoreStatFilter; private final IgnoreStatFilter ignoreStatFilter;
private final BulkWriter bulkWriter; private final BulkWriter bulkWriter;
private final MapLinkConfiguration mapLinkConfiguration;
@Autowired @Autowired
public HbaseMapStatisticsCalleeDao(IgnoreStatFilter ignoreStatFilter, public HbaseMapStatisticsCalleeDao(MapLinkConfiguration mapLinkConfiguration,
IgnoreStatFilter ignoreStatFilter,
AcceptedTimeService acceptedTimeService, TimeSlot timeSlot, AcceptedTimeService acceptedTimeService, TimeSlot timeSlot,
@Qualifier("calleeBulkWriter") BulkWriter bulkWriter) { @Qualifier("calleeBulkWriter") BulkWriter bulkWriter) {
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter"); this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter");
this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");
...@@ -94,16 +98,20 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { ...@@ -94,16 +98,20 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao {
final RowKey calleeRowKey = new CallRowKey(calleeApplicationName, calleeServiceType.getCode(), rowTimeSlot); final RowKey calleeRowKey = new CallRowKey(calleeApplicationName, calleeServiceType.getCode(), rowTimeSlot);
final short callerSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(calleeServiceType, elapsed, isError); final short callerSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(calleeServiceType, elapsed, isError);
final ColumnName callerColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, callerSlotNumber);
HistogramSchema histogramSchema = calleeServiceType.getHistogramSchema(); HistogramSchema histogramSchema = calleeServiceType.getHistogramSchema();
final ColumnName sumColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, histogramSchema.getSumStatSlot().getSlotTime());
final ColumnName maxColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, histogramSchema.getMaxStatSlot().getSlotTime());
final ColumnName callerColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, callerSlotNumber);
this.bulkWriter.increment(calleeRowKey, callerColumnName); this.bulkWriter.increment(calleeRowKey, callerColumnName);
this.bulkWriter.increment(calleeRowKey, sumColumnName, elapsed);
this.bulkWriter.updateMax(calleeRowKey, maxColumnName, elapsed);
if (mapLinkConfiguration.isEnableAvg()) {
final ColumnName sumColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, histogramSchema.getSumStatSlot().getSlotTime());
this.bulkWriter.increment(calleeRowKey, sumColumnName, elapsed);
}
if (mapLinkConfiguration.isEnableMax()) {
final ColumnName maxColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, histogramSchema.getMaxStatSlot().getSlotTime());
this.bulkWriter.updateMax(calleeRowKey, maxColumnName, elapsed);
}
} }
...@@ -117,6 +125,4 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { ...@@ -117,6 +125,4 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao {
this.bulkWriter.flushAvgMax(); this.bulkWriter.flushAvgMax();
} }
} }
...@@ -21,6 +21,7 @@ import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; ...@@ -21,6 +21,7 @@ import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey; import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CalleeColumnName; import com.navercorp.pinpoint.collector.dao.hbase.statistics.CalleeColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName; import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey; import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils; import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
...@@ -52,12 +53,15 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao { ...@@ -52,12 +53,15 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao {
private final TimeSlot timeSlot; private final TimeSlot timeSlot;
private final BulkWriter bulkWriter; private final BulkWriter bulkWriter;
private final MapLinkConfiguration mapLinkConfiguration;
@Autowired @Autowired
public HbaseMapStatisticsCallerDao(AcceptedTimeService acceptedTimeService, public HbaseMapStatisticsCallerDao(MapLinkConfiguration mapLinkConfiguration,
AcceptedTimeService acceptedTimeService,
TimeSlot timeSlot, TimeSlot timeSlot,
@Qualifier("callerBulkWriter") BulkWriter bulkWriter) { @Qualifier("callerBulkWriter") BulkWriter bulkWriter) {
this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");
...@@ -85,15 +89,20 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao { ...@@ -85,15 +89,20 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao {
final RowKey callerRowKey = new CallRowKey(callerApplicationName, callerServiceType.getCode(), rowTimeSlot); final RowKey callerRowKey = new CallRowKey(callerApplicationName, callerServiceType.getCode(), rowTimeSlot);
final short calleeSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(calleeServiceType, elapsed, isError); final short calleeSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(calleeServiceType, elapsed, isError);
final ColumnName calleeColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, calleeSlotNumber);
HistogramSchema histogramSchema = callerServiceType.getHistogramSchema(); HistogramSchema histogramSchema = callerServiceType.getHistogramSchema();
final ColumnName sumColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, histogramSchema.getSumStatSlot().getSlotTime());
final ColumnName maxColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, histogramSchema.getMaxStatSlot().getSlotTime());
final ColumnName calleeColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, calleeSlotNumber);
this.bulkWriter.increment(callerRowKey, calleeColumnName); this.bulkWriter.increment(callerRowKey, calleeColumnName);
this.bulkWriter.increment(callerRowKey, sumColumnName, elapsed);
this.bulkWriter.updateMax(callerRowKey, maxColumnName, elapsed); if (mapLinkConfiguration.isEnableAvg()) {
final ColumnName sumColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, histogramSchema.getSumStatSlot().getSlotTime());
this.bulkWriter.increment(callerRowKey, sumColumnName, elapsed);
}
if (mapLinkConfiguration.isEnableMax()) {
final ColumnName maxColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, histogramSchema.getMaxStatSlot().getSlotTime());
this.bulkWriter.updateMax(callerRowKey, maxColumnName, elapsed);
}
} }
......
...@@ -36,18 +36,16 @@ public class BulkConfiguration { ...@@ -36,18 +36,16 @@ public class BulkConfiguration {
private final int selfLimitSize; private final int selfLimitSize;
private final boolean enableBulk; private final boolean enableBulk;
private final boolean enableAvgMax;
public BulkConfiguration(@Value("${collector.cachedStatDao.caller.limit:-1}") int callerLimitSize, public BulkConfiguration(@Value("${collector.cachedStatDao.caller.limit:-1}") int callerLimitSize,
@Value("${collector.cachedStatDao.callee.limit:-1}") int calleeLimitSize, @Value("${collector.cachedStatDao.callee.limit:-1}") int calleeLimitSize,
@Value("${collector.cachedStatDao.self.limit:-1}") int selfLimitSize, @Value("${collector.cachedStatDao.self.limit:-1}") int selfLimitSize,
@Value("${collector.cachedStatDao.bulk.enable:true}") boolean enableBulk, @Value("${collector.cachedStatDao.bulk.enable:true}") boolean enableBulk) {
@Value("${collector.cachedStatDao.avg-max.enable:true}") boolean enableAvgMax) {
this.callerLimitSize = callerLimitSize; this.callerLimitSize = callerLimitSize;
this.calleeLimitSize = calleeLimitSize; this.calleeLimitSize = calleeLimitSize;
this.selfLimitSize = selfLimitSize; this.selfLimitSize = selfLimitSize;
this.enableBulk = enableBulk; this.enableBulk = enableBulk;
this.enableAvgMax = enableAvgMax;
} }
public int getCallerLimitSize() { public int getCallerLimitSize() {
...@@ -66,10 +64,6 @@ public class BulkConfiguration { ...@@ -66,10 +64,6 @@ public class BulkConfiguration {
return enableBulk; return enableBulk;
} }
public boolean enableAvgMax() {
return enableAvgMax;
}
@PostConstruct @PostConstruct
public void log() { public void log() {
logger.info("{}", this); logger.info("{}", this);
...@@ -82,7 +76,6 @@ public class BulkConfiguration { ...@@ -82,7 +76,6 @@ public class BulkConfiguration {
", calleeLimitSize=" + calleeLimitSize + ", calleeLimitSize=" + calleeLimitSize +
", selfLimitSize=" + selfLimitSize + ", selfLimitSize=" + selfLimitSize +
", enableBulk=" + enableBulk + ", enableBulk=" + enableBulk +
", enableAvgMax=" + enableAvgMax +
'}'; '}';
} }
} }
package com.navercorp.pinpoint.collector.dao.hbase.statistics;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MapLinkConfiguration {
private final boolean enableAvg;
private final boolean enableMax;
public MapLinkConfiguration(@Value("${collector.map-link.avg.enable:true}") boolean enableAvg,
@Value("${collector.map-link.max.enable:true}") boolean enableMax) {
this.enableAvg = enableAvg;
this.enableMax = enableMax;
}
public boolean isEnableAvg() {
return enableAvg;
}
public boolean isEnableMax() {
return enableMax;
}
}
...@@ -135,7 +135,8 @@ collector.cachedStatDao.caller.limit=-1 ...@@ -135,7 +135,8 @@ collector.cachedStatDao.caller.limit=-1
collector.cachedStatDao.callee.limit=-1 collector.cachedStatDao.callee.limit=-1
collector.cachedStatDao.self.limit=-1 collector.cachedStatDao.self.limit=-1
collector.cachedStatDao.bulk.enable=true collector.cachedStatDao.bulk.enable=true
collector.cachedStatDao.avg-max.enable=true collector.map-link.avg.enable=true
collector.map-link.max.enable=true
# Flink configuration # Flink configuration
flink.cluster.enable=false flink.cluster.enable=false
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册