StateAnalyzer.java 7.8 KB
Newer Older
Y
yong.you 已提交
1
package com.dianping.cat.consumer.state;
Y
youyong 已提交
2 3 4

import java.util.Date;
import java.util.Map;
S
sunryuan 已提交
5
import java.util.Map.Entry;
S
sunryuan 已提交
6
import java.util.concurrent.atomic.AtomicLong;
Y
youyong 已提交
7

Y
youyong 已提交
8
import org.unidal.lookup.annotation.Inject;
F
Frankie Wu 已提交
9
import org.unidal.lookup.annotation.Named;
10 11
import org.unidal.lookup.logging.LogEnabled;
import org.unidal.lookup.logging.Logger;
Y
youyong 已提交
12

Y
yong.you 已提交
13
import com.dianping.cat.Constants;
14
import com.dianping.cat.analysis.AbstractMessageAnalyzer;
F
Frankie Wu 已提交
15
import com.dianping.cat.analysis.MessageAnalyzer;
16
import com.dianping.cat.config.server.ServerFilterConfigManager;
F
Frankie Wu 已提交
17
import com.dianping.cat.configuration.NetworkInterfaceManager;
S
bug sif  
sunryuan 已提交
18
import com.dianping.cat.consumer.state.model.entity.Detail;
Y
youyong 已提交
19
import com.dianping.cat.consumer.state.model.entity.Machine;
S
sunryuan 已提交
20
import com.dianping.cat.consumer.state.model.entity.Message;
S
sunryuan 已提交
21
import com.dianping.cat.consumer.state.model.entity.ProcessDomain;
Y
youyong 已提交
22 23
import com.dianping.cat.consumer.state.model.entity.StateReport;
import com.dianping.cat.message.spi.MessageTree;
Y
youyong205 已提交
24
import com.dianping.cat.report.DefaultReportManager.StoragePolicy;
F
Frankie Wu 已提交
25
import com.dianping.cat.report.ReportManager;
26
import com.dianping.cat.statistic.ServerStatistic.Statistic;
Y
yong.you 已提交
27
import com.dianping.cat.statistic.ServerStatisticManager;
Y
youyong 已提交
28

F
Frankie Wu 已提交
29
@Named(type = MessageAnalyzer.class, value = StateAnalyzer.ID, instantiationStrategy = Named.PER_LOOKUP)
Y
youyong 已提交
30
public class StateAnalyzer extends AbstractMessageAnalyzer<StateReport> implements LogEnabled {
31
	public static final String ID = "state";
Y
youyong 已提交
32

Y
yong.you 已提交
33 34
	@Inject(ID)
	private ReportManager<StateReport> m_reportManager;
Y
youyong 已提交
35 36

	@Inject
Y
yong.you 已提交
37
	private ServerStatisticManager m_serverStateManager;
Y
youyong 已提交
38

39 40 41
	@Inject
	private ServerFilterConfigManager m_serverFilterConfigManager;

A
ainilife 已提交
42
	private String m_ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
Y
youyong 已提交
43

Y
youyong205 已提交
44
	private Machine buildStateInfo(Machine machine) {
Y
youyong 已提交
45 46 47
		long minute = 1000 * 60;
		long start = m_startTime;
		long end = m_startTime + minute * 60;
Y
yong.you 已提交
48
		double maxTps = 0;
Y
youyong205 已提交
49
		long current = System.currentTimeMillis();
Y
youyong205 已提交
50 51
		int size = 0;

Y
youyong205 已提交
52 53
		if (end > current) {
			end = current;
Y
youyong 已提交
54 55
		}
		for (; start < end; start += minute) {
Y
youyong205 已提交
56
			Statistic state = m_serverStateManager.findOrCreateState(start);
S
sunryuan 已提交
57
			Message temp = machine.findOrCreateMessage(start);
Y
youyong205 已提交
58 59 60 61
			Map<String, AtomicLong> totals = state.getMessageTotals();
			Map<String, AtomicLong> totalLosses = state.getMessageTotalLosses();
			Map<String, AtomicLong> sizes = state.getMessageSizes();

S
sunryuan 已提交
62
			for (Entry<String, AtomicLong> entry : totals.entrySet()) {
Y
youyong205 已提交
63
				String domain = entry.getKey();
S
sunryuan 已提交
64
				long value = entry.getValue().get();
Y
youyong205 已提交
65 66
				ProcessDomain processDomain = machine.findOrCreateProcessDomain(domain);
				Detail detail = processDomain.findOrCreateDetail(start);
Y
yong.you 已提交
67

Y
youyong205 已提交
68 69
				processDomain.setTotal(value + processDomain.getTotal());
				detail.setTotal(value + detail.getTotal());
S
sunryuan 已提交
70
			}
Y
youyong205 已提交
71 72 73 74 75
			for (Entry<String, AtomicLong> entry : totalLosses.entrySet()) {
				String domain = entry.getKey();
				long value = entry.getValue().get();
				ProcessDomain processDomain = machine.findOrCreateProcessDomain(domain);
				Detail detail = processDomain.findOrCreateDetail(start);
Y
yong.you 已提交
76

Y
youyong205 已提交
77 78
				processDomain.setTotalLoss(value + processDomain.getTotalLoss());
				detail.setTotalLoss(value + detail.getTotalLoss());
Y
youyong 已提交
79
			}
Y
youyong205 已提交
80 81 82 83 84
			for (Entry<String, AtomicLong> entry : sizes.entrySet()) {
				String domain = entry.getKey();
				long value = entry.getValue().get();
				ProcessDomain processDomain = machine.findOrCreateProcessDomain(domain);
				Detail detail = processDomain.findOrCreateDetail(start);
Y
youyong 已提交
85

Y
youyong205 已提交
86 87
				processDomain.setSize(value + processDomain.getSize());
				detail.setSize(value + detail.getSize());
Y
youyong205 已提交
88
			}
Y
youyong 已提交
89

Y
youyong205 已提交
90 91 92 93
			long messageTotal = state.getMessageTotal();
			long messageTotalLoss = state.getMessageTotalLoss();
			long messageSize = state.getMessageSize();
			long blockTotal = state.getBlockTotal();
Y
youyong 已提交
94
			long blockLoss = state.getBlockLoss();
Y
youyong 已提交
95
			long blockTime = state.getBlockTime();
Y
youyong 已提交
96 97
			long pigeonTimeError = state.getPigeonTimeError();
			long networkTimeError = state.getNetworkTimeError();
Y
youyong 已提交
98 99 100 101 102
			long messageDump = state.getMessageDump();
			long messageDumpLoss = state.getMessageDumpLoss();
			int processDelayCount = state.getProcessDelayCount();
			double processDelaySum = state.getProcessDelaySum();

Y
youyong205 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
			temp.setTotal(messageTotal).setTotalLoss(messageTotalLoss).setSize(messageSize);
			temp.setBlockTotal(blockTotal).setBlockLoss(blockLoss).setBlockTime(blockTime);
			temp.setPigeonTimeError(pigeonTimeError).setNetworkTimeError(networkTimeError).setDump(messageDump);
			temp.setDumpLoss(messageDumpLoss).setDelayCount(processDelayCount).setDelaySum(processDelaySum);

			machine.setTotal(messageTotal + machine.getTotal()).setTotalLoss(messageTotalLoss + machine.getTotalLoss())
			      .setSize(messageSize + machine.getSize());
			machine.setBlockTotal(machine.getBlockTotal() + blockTotal).setBlockLoss(machine.getBlockLoss() + blockLoss)
			      .setBlockTime(machine.getBlockTime() + blockTime);
			machine.setPigeonTimeError(machine.getPigeonTimeError() + pigeonTimeError)
			      .setNetworkTimeError(machine.getNetworkTimeError() + networkTimeError)
			      .setDump(machine.getDump() + messageDump);
			machine.setDumpLoss(machine.getDumpLoss() + messageDumpLoss)
			      .setDelayCount(machine.getDelayCount() + processDelayCount)
			      .setDelaySum(machine.getDelaySum() + processDelaySum);

Y
youyong 已提交
119
			double avg = 0;
Y
youyong205 已提交
120
			long count = machine.getDelayCount();
S
sunryuan 已提交
121

Y
youyong 已提交
122
			if (count > 0) {
Y
youyong205 已提交
123
				avg = machine.getDelaySum() / count;
Y
youyong 已提交
124 125
				machine.setDelayAvg(avg);
			}
Y
youyong205 已提交
126 127 128
			if (messageTotal > maxTps) {
				maxTps = messageTotal;
			}
Y
youyong 已提交
129
			temp.setTime(new Date(start));
S
sunryuan 已提交
130
			size++;
Y
youyong 已提交
131 132 133 134
		}

		double avgTps = 0;
		if (size > 0) {
Y
youyong 已提交
135
			avgTps = machine.getTotal() / (double) size;
Y
youyong 已提交
136 137 138
		}
		machine.setAvgTps(avgTps);
		machine.setMaxTps(maxTps);
Y
youyong205 已提交
139

Y
youyong205 已提交
140
		return machine;
Y
youyong 已提交
141 142
	}

Y
youyong 已提交
143
	@Override
Y
youyong205 已提交
144
	public synchronized void doCheckpoint(boolean atEnd) {
Y
youyong205 已提交
145
		long startTime = getStartTime();
Y
yong.you 已提交
146
		StateReport stateReport = getReport(Constants.CAT);
Y
youyong205 已提交
147
		Map<String, StateReport> reports = m_reportManager.getHourlyReports(startTime);
Y
yong.you 已提交
148

Y
yong.you 已提交
149
		reports.put(Constants.CAT, stateReport);
Y
yong.you 已提交
150
		if (atEnd && !isLocalMode()) {
Y
youyong205 已提交
151
			m_reportManager.storeHourlyReports(startTime, StoragePolicy.FILE_AND_DB, m_index);
Y
yong.you 已提交
152
		} else {
Y
youyong205 已提交
153
			m_reportManager.storeHourlyReports(startTime, StoragePolicy.FILE, m_index);
Y
yong.you 已提交
154 155 156 157 158 159 160 161 162 163
		}
		if (atEnd) {
			long minute = 1000 * 60;
			long start = m_startTime - minute * 60 * 2;
			long end = m_startTime - minute * 60;

			for (; start < end; start += minute) {
				m_serverStateManager.removeState(start);
			}
		}
Y
youyong 已提交
164 165 166 167 168 169 170 171 172
	}

	@Override
	public void enableLogging(Logger logger) {
		m_logger = logger;
	}

	@Override
	public StateReport getReport(String domain) {
Y
yong.you 已提交
173
		StateReport report = new StateReport(Constants.CAT);
Y
yong.you 已提交
174

Y
youyong 已提交
175 176
		report.setStartTime(new Date(m_startTime));
		report.setEndTime(new Date(m_startTime + MINUTE * 60 - 1));
Y
youyong 已提交
177

Y
youyong205 已提交
178
		Machine machine = buildStateInfo(report.findOrCreateMachine(m_ip));
Y
yong.you 已提交
179
		StateReport stateReport = m_reportManager.getHourlyReport(getStartTime(), Constants.CAT, true);
A
ainilife 已提交
180
		Map<String, ProcessDomain> processDomains = stateReport.findOrCreateMachine(m_ip).getProcessDomains();
Y
youyong205 已提交
181

Y
yong.you 已提交
182 183 184 185 186 187
		for (Map.Entry<String, ProcessDomain> entry : machine.getProcessDomains().entrySet()) {
			ProcessDomain processDomain = processDomains.get(entry.getKey());

			if (processDomain != null) {
				entry.getValue().getIps().addAll(processDomain.getIps());
			}
S
sunryuan 已提交
188
		}
Y
youyong 已提交
189 190 191
		return report;
	}

Y
youyong205 已提交
192 193 194 195 196
	@Override
   public ReportManager<StateReport> getReportManager() {
	   return m_reportManager;
   }

Y
youyong205 已提交
197
	@Override
198 199 200
	protected void loadReports() {
		// do nothing
	}
Y
youyong205 已提交
201
	
Y
youyong 已提交
202 203 204 205
	@Override
	protected void process(MessageTree tree) {
		String domain = tree.getDomain();

206
		if (m_serverFilterConfigManager.validateDomain(domain)) {
Y
youyong205 已提交
207 208 209 210 211
			StateReport report = m_reportManager.getHourlyReport(getStartTime(), Constants.CAT, true);
			String ip = tree.getIpAddress();
			Machine machine = report.findOrCreateMachine(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());

			machine.findOrCreateProcessDomain(domain).addIp(ip);
Y
youyong 已提交
212 213
		}
	}
F
Frankie Wu 已提交
214 215 216 217
	
	public void setIp(String ip) {
		m_ip = ip;
	}
Y
youyong 已提交
218
}