StateAnalyzer.java 6.8 KB
Newer Older
Y
yong.you 已提交
1
package com.dianping.cat.consumer.state;
Y
youyong 已提交
2 3 4 5 6 7

import java.util.Date;
import java.util.Map;

import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
Y
youyong 已提交
8
import org.unidal.lookup.annotation.Inject;
Y
youyong 已提交
9

S
sunryuan 已提交
10
import com.dianping.cat.DomainManager;
11
import com.dianping.cat.analysis.AbstractMessageAnalyzer;
F
Frankie Wu 已提交
12
import com.dianping.cat.configuration.NetworkInterfaceManager;
Y
youyong 已提交
13
import com.dianping.cat.consumer.state.model.entity.Machine;
S
sunryuan 已提交
14
import com.dianping.cat.consumer.state.model.entity.ProcessDomain;
Y
youyong 已提交
15
import com.dianping.cat.consumer.state.model.entity.StateReport;
16
import com.dianping.cat.core.dal.Hostinfo;
Y
youyong 已提交
17
import com.dianping.cat.message.spi.MessageTree;
Y
yong.you 已提交
18 19 20
import com.dianping.cat.service.DefaultReportManager.StoragePolicy;
import com.dianping.cat.service.ReportConstants;
import com.dianping.cat.service.ReportManager;
21
import com.dianping.cat.statistic.ServerStatistic.Statistic;
Y
yong.you 已提交
22
import com.dianping.cat.statistic.ServerStatisticManager;
Y
youyong 已提交
23 24

public class StateAnalyzer extends AbstractMessageAnalyzer<StateReport> implements LogEnabled {
25
	public static final String ID = "state";
Y
youyong 已提交
26

Y
yong.you 已提交
27 28
	@Inject(ID)
	private ReportManager<StateReport> m_reportManager;
Y
youyong 已提交
29 30

	@Inject
Y
yong.you 已提交
31
	private ServerStatisticManager m_serverStateManager;
Y
youyong 已提交
32 33

	@Inject
Y
yong.you 已提交
34
	private DomainManager m_domainManager;
Y
youyong 已提交
35

Y
youyong 已提交
36 37 38 39 40 41 42 43 44 45 46 47
	private void buildStateInfo(Machine machine) {
		long minute = 1000 * 60;
		long start = m_startTime;
		long end = m_startTime + minute * 60;

		if (end > System.currentTimeMillis()) {
			end = System.currentTimeMillis();
		}
		int size = 0;
		double maxTps = 0;
		for (; start < end; start += minute) {
			size++;
48
			Statistic state = m_serverStateManager.findState(start);
Y
youyong 已提交
49 50

			com.dianping.cat.consumer.state.model.entity.Message temp = machine.findOrCreateMessage(start);
S
sunryuan 已提交
51
			Map<String, Long> totals = state.getMessageTotals();
Y
youyong 已提交
52 53
			long messageTotal = state.getMessageTotal();
			temp.setTotal(messageTotal);
S
sunryuan 已提交
54
			
S
sunryuan 已提交
55 56
			Map<String,Long> totalLosses = state.getMessageTotalLosses();
			long messageTotalLoss = state.getMessageTotalLoss();
S
sunryuan 已提交
57 58 59
			temp.setTotalLoss(messageTotalLoss);
			
			Map<String,Double> sizes = state.getMessageSizes();
S
sunryuan 已提交
60
			double messageSize = state.getMessageSize();
S
sunryuan 已提交
61 62
			temp.setSize(messageSize);
			
S
sunryuan 已提交
63 64 65 66
			machine.setTotal(messageTotal + machine.getTotal());
			machine.setTotalLoss(messageTotalLoss + machine.getTotalLoss());
			machine.setSize(messageSize + machine.getSize());
		
S
sunryuan 已提交
67
			for (String key : totals.keySet()) {
S
sunryuan 已提交
68 69
				ProcessDomain domain = machine.findOrCreateProcessDomain(key);
				if(totals.containsKey(key)){
S
sunryuan 已提交
70
					domain.setTotal(totals.get(key) + domain.getTotal());
S
sunryuan 已提交
71 72
				}
				if(totalLosses.containsKey(key)){
S
sunryuan 已提交
73
					domain.setTotalLoss(totalLosses.get(key) + domain.getTotalLoss());
S
sunryuan 已提交
74 75
				}
				if(sizes.containsKey(key)){
S
sunryuan 已提交
76 77 78 79
					domain.setSize(sizes.get(key) + domain.getSize());
				}
			}
			
Y
youyong 已提交
80 81 82 83
			if (messageTotal > maxTps) {
				maxTps = messageTotal;
			}

Y
youyong 已提交
84 85 86
			long blockTotal = state.getBlockTotal();
			temp.setBlockTotal(blockTotal);
			machine.setBlockTotal(machine.getBlockTotal() + blockTotal);
Y
youyong 已提交
87

Y
youyong 已提交
88 89 90
			long blockLoss = state.getBlockLoss();
			temp.setBlockLoss(blockLoss);
			machine.setBlockLoss(machine.getBlockLoss() + blockLoss);
Y
youyong 已提交
91 92 93 94

			long blockTime = state.getBlockTime();
			temp.setBlockTime(blockTime);
			machine.setBlockTime(machine.getBlockTime() + blockTime);
Y
youyong 已提交
95

Y
youyong 已提交
96 97 98
			long pigeonTimeError = state.getPigeonTimeError();
			temp.setPigeonTimeError(pigeonTimeError);
			machine.setPigeonTimeError(machine.getPigeonTimeError() + pigeonTimeError);
Y
youyong 已提交
99

Y
youyong 已提交
100 101 102
			long networkTimeError = state.getNetworkTimeError();
			temp.setNetworkTimeError(networkTimeError);
			machine.setNetworkTimeError(machine.getNetworkTimeError() + networkTimeError);
Y
youyong 已提交
103

Y
youyong 已提交
104 105 106 107 108 109 110 111
			long messageDump = state.getMessageDump();
			temp.setDump(messageDump);
			machine.setDump(machine.getDump() + messageDump);

			long messageDumpLoss = state.getMessageDumpLoss();
			temp.setDumpLoss(messageDumpLoss);
			machine.setDumpLoss(machine.getDumpLoss() + messageDumpLoss);

S
sunryuan 已提交
112

Y
youyong 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133

			int processDelayCount = state.getProcessDelayCount();
			temp.setDelayCount(processDelayCount);
			machine.setDelayCount(machine.getDelayCount() + processDelayCount);

			double processDelaySum = state.getProcessDelaySum();
			temp.setDelaySum(processDelaySum);
			machine.setDelaySum(machine.getDelaySum() + processDelaySum);

			double sum = machine.getDelaySum();
			long count = machine.getDelayCount();
			double avg = 0;
			if (count > 0) {
				avg = sum / count;
				machine.setDelayAvg(avg);
			}
			temp.setTime(new Date(start));
		}

		double avgTps = 0;
		if (size > 0) {
Y
youyong 已提交
134
			avgTps = machine.getTotal() / (double) size;
Y
youyong 已提交
135 136 137 138 139
		}
		machine.setAvgTps(avgTps);
		machine.setMaxTps(maxTps);
	}

Y
youyong 已提交
140 141
	@Override
	public void doCheckpoint(boolean atEnd) {
Y
yong.you 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
		StateReport stateReport = getReport(ReportConstants.CAT);
		Map<String, StateReport> reports = m_reportManager.getHourlyReports(getStartTime());

		reports.put(ReportConstants.CAT, stateReport);
		long startTime = getStartTime();
		if (atEnd && !isLocalMode()) {
			m_reportManager.storeHourlyReports(startTime, StoragePolicy.FILE_AND_DB);
		} else {
			m_reportManager.storeHourlyReports(startTime, StoragePolicy.FILE);
		}
		if (atEnd) {
			long minute = 1000 * 60;
			long start = m_startTime - minute * 60 * 2;
			long end = m_startTime - minute * 60;

			for (; start < end; start += minute) {
				m_serverStateManager.removeState(start);
			}
		}
Y
youyong 已提交
161 162 163 164 165 166 167 168 169
	}

	@Override
	public void enableLogging(Logger logger) {
		m_logger = logger;
	}

	@Override
	public StateReport getReport(String domain) {
Y
youyong 已提交
170
		StateReport report = new StateReport(domain);
Y
yong.you 已提交
171
		report = new StateReport(ReportConstants.CAT);
Y
youyong 已提交
172 173 174
		report.setStartTime(new Date(m_startTime));
		report.setEndTime(new Date(m_startTime + MINUTE * 60 - 1));
		report.getMachines().clear();
Y
yong.you 已提交
175

Y
youyong 已提交
176 177
		String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
		Machine machine = report.findOrCreateMachine(ip);
Y
youyong 已提交
178 179

		buildStateInfo(machine);
Y
yong.you 已提交
180
		StateReport startReport = m_reportManager.getHourlyReport(getStartTime(), ReportConstants.CAT, true);
S
sunryuan 已提交
181 182 183 184
		for(String key:machine.getProcessDomains().keySet()){
				machine.getProcessDomains().get(key).getIps().addAll(startReport.findOrCreateMachine(ip).getProcessDomains().get(key).getIps());
		}
//		machine.getProcessDomains().putAll(startReport.findOrCreateMachine(ip).getProcessDomains());
Y
youyong 已提交
185 186 187 188 189
		return report;
	}

	@Override
	protected void process(MessageTree tree) {
Y
yong.you 已提交
190
		StateReport report = m_reportManager.getHourlyReport(getStartTime(), ReportConstants.CAT, true);
Y
youyong 已提交
191 192 193 194
		String domain = tree.getDomain();
		String ip = tree.getIpAddress();
		Machine machine = report.findOrCreateMachine(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());

Y
yong.you 已提交
195 196 197
		machine.findOrCreateProcessDomain(domain).addIp(ip);
		if (validate(domain)) {
			if (!m_domainManager.containsDomainInCat(domain)) {
198
				m_domainManager.insertDomain(domain);
Y
yong.you 已提交
199
			}
Y
yong.you 已提交
200
			Hostinfo ipInfo = m_domainManager.queryHostInfoByIp(ip);
201

Y
yong.you 已提交
202
			if (ipInfo == null) {
Y
yong.you 已提交
203
				m_domainManager.insert(domain, ip);
Y
youyong 已提交
204 205 206 207 208
			}
		}
	}

}