提交 7f595a7b 编写于 作者: Y yong.you

modify the task package and transaction domains

上级 c5647d3f
...@@ -33,7 +33,6 @@ import com.dianping.cat.core.config.ConfigDao; ...@@ -33,7 +33,6 @@ import com.dianping.cat.core.config.ConfigDao;
import com.dianping.cat.core.dal.HostinfoDao; import com.dianping.cat.core.dal.HostinfoDao;
import com.dianping.cat.core.dal.HourlyReportDao; import com.dianping.cat.core.dal.HourlyReportDao;
import com.dianping.cat.core.dal.ProjectDao; import com.dianping.cat.core.dal.ProjectDao;
import com.dianping.cat.core.dal.TaskDao;
import com.dianping.cat.message.spi.core.MessageConsumer; import com.dianping.cat.message.spi.core.MessageConsumer;
import com.dianping.cat.service.DefaultReportManager; import com.dianping.cat.service.DefaultReportManager;
import com.dianping.cat.service.ReportDelegate; import com.dianping.cat.service.ReportDelegate;
...@@ -108,7 +107,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { ...@@ -108,7 +107,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(ReportDelegate.class, ID) // .req(ReportDelegate.class, ID) //
.req(BucketManager.class, HourlyReportDao.class) // .req(BucketManager.class, HourlyReportDao.class) //
.config(E("name").value(ID))); .config(E("name").value(ID)));
all.add(C(ReportDelegate.class, ID, TransactionDelegate.class).req(TaskDao.class)); all.add(C(ReportDelegate.class, ID, TransactionDelegate.class).req(TaskManager.class));
return all; return all;
} }
......
...@@ -52,6 +52,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo ...@@ -52,6 +52,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
public TransactionReport getReport(String domain) { public TransactionReport getReport(String domain) {
TransactionReport report = m_reportManager.getHourlyReport(getStartTime(), domain, false); TransactionReport report = m_reportManager.getHourlyReport(getStartTime(), domain, false);
report.getDomainNames().addAll(m_reportManager.getDomains(getStartTime()));
report.accept(new TransactionStatisticsComputer()); report.accept(new TransactionStatisticsComputer());
return report; return report;
} }
......
...@@ -9,18 +9,17 @@ import java.util.Set; ...@@ -9,18 +9,17 @@ import java.util.Set;
import org.unidal.lookup.annotation.Inject; import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport; import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultSaxParser; import com.dianping.cat.consumer.transaction.model.transform.DefaultSaxParser;
import com.dianping.cat.core.dal.Task;
import com.dianping.cat.core.dal.TaskDao;
import com.dianping.cat.service.ReportConstants; import com.dianping.cat.service.ReportConstants;
import com.dianping.cat.service.ReportDelegate; import com.dianping.cat.service.ReportDelegate;
import com.dianping.cat.task.TaskManager;
import com.dianping.cat.task.TaskManager.TaskProlicy;
public class TransactionDelegate implements ReportDelegate<TransactionReport> { public class TransactionDelegate implements ReportDelegate<TransactionReport> {
@Inject @Inject
private TaskDao m_taskDao; private TaskManager m_taskManager;
@Override @Override
public void afterLoad(Map<String, TransactionReport> reports) { public void afterLoad(Map<String, TransactionReport> reports) {
...@@ -102,19 +101,6 @@ public class TransactionDelegate implements ReportDelegate<TransactionReport> { ...@@ -102,19 +101,6 @@ public class TransactionDelegate implements ReportDelegate<TransactionReport> {
@Override @Override
public boolean createHourlyTask(TransactionReport report) { public boolean createHourlyTask(TransactionReport report) {
try { return m_taskManager.createTask(report.getStartTime(), report.getDomain(), "transaction", TaskProlicy.ALL);
Task task = m_taskDao.createLocal();
task.setCreationDate(new Date());
task.setProducer(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());
task.setReportDomain(report.getDomain());
task.setReportName("transaction");
task.setReportPeriod(report.getStartTime());
task.setStatus(1);
m_taskDao.insert(task);
} catch (Exception e) {
Cat.logError(e);
}
return false;
} }
} }
...@@ -75,7 +75,7 @@ ...@@ -75,7 +75,7 @@
<implementation>com.dianping.cat.consumer.transaction.TransactionDelegate</implementation> <implementation>com.dianping.cat.consumer.transaction.TransactionDelegate</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.core.dal.TaskDao</role> <role>com.dianping.cat.task.TaskManager</role>
</requirement> </requirement>
</requirements> </requirements>
</component> </component>
......
...@@ -6,8 +6,10 @@ import java.util.ArrayList; ...@@ -6,8 +6,10 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled; import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger; import org.codehaus.plexus.logging.Logger;
...@@ -60,6 +62,17 @@ public class DefaultReportManager<T> implements ReportManager<T>, LogEnabled { ...@@ -60,6 +62,17 @@ public class DefaultReportManager<T> implements ReportManager<T>, LogEnabled {
m_logger = logger; m_logger = logger;
} }
@Override
public Set<String> getDomains(long startTime) {
Map<String, T> reports = m_map.get(startTime);
if (reports == null) {
return new HashSet<String>();
} else {
return reports.keySet();
}
}
@Override @Override
public T getHourlyReport(long startTime, String domain, boolean createIfNotExist) { public T getHourlyReport(long startTime, String domain, boolean createIfNotExist) {
Map<String, T> reports = m_map.get(startTime); Map<String, T> reports = m_map.get(startTime);
...@@ -197,7 +210,6 @@ public class DefaultReportManager<T> implements ReportManager<T>, LogEnabled { ...@@ -197,7 +210,6 @@ public class DefaultReportManager<T> implements ReportManager<T>, LogEnabled {
r.setContent(xml); r.setContent(xml);
m_reportDao.insert(r); m_reportDao.insert(r);
m_reportDelegate.createHourlyTask(report); m_reportDelegate.createHourlyTask(report);
} catch (Throwable e) { } catch (Throwable e) {
t.setStatus(e); t.setStatus(e);
......
package com.dianping.cat.service; package com.dianping.cat.service;
import java.util.Map; import java.util.Map;
import java.util.Set;
import com.dianping.cat.service.DefaultReportManager.StoragePolicy; import com.dianping.cat.service.DefaultReportManager.StoragePolicy;
...@@ -9,6 +10,8 @@ public interface ReportManager<T> { ...@@ -9,6 +10,8 @@ public interface ReportManager<T> {
public void initialize(); public void initialize();
public Set<String> getDomains(long startTime);
public T getHourlyReport(long startTime, String domain, boolean createIfNotExist); public T getHourlyReport(long startTime, String domain, boolean createIfNotExist);
public Map<String, T> getHourlyReports(long startTime); public Map<String, T> getHourlyReports(long startTime);
......
...@@ -12,7 +12,7 @@ import com.dianping.cat.consumer.CatConsumerModule; ...@@ -12,7 +12,7 @@ import com.dianping.cat.consumer.CatConsumerModule;
import com.dianping.cat.consumer.core.aggregation.AggregationConfigManager; import com.dianping.cat.consumer.core.aggregation.AggregationConfigManager;
import com.dianping.cat.message.spi.core.MessageConsumer; import com.dianping.cat.message.spi.core.MessageConsumer;
import com.dianping.cat.message.spi.core.TcpSocketReceiver; import com.dianping.cat.message.spi.core.TcpSocketReceiver;
import com.dianping.cat.report.task.thread.DefaultTaskConsumer; import com.dianping.cat.report.task.DefaultTaskConsumer;
import com.dianping.cat.report.view.DomainNavManager; import com.dianping.cat.report.view.DomainNavManager;
import com.dianping.cat.system.alarm.AlarmRuleCreator; import com.dianping.cat.system.alarm.AlarmRuleCreator;
import com.dianping.cat.system.alarm.AlarmTask; import com.dianping.cat.system.alarm.AlarmTask;
......
...@@ -21,6 +21,7 @@ import com.dianping.cat.report.baseline.impl.DefaultBaselineCreator; ...@@ -21,6 +21,7 @@ import com.dianping.cat.report.baseline.impl.DefaultBaselineCreator;
import com.dianping.cat.report.baseline.impl.DefaultBaselineService; import com.dianping.cat.report.baseline.impl.DefaultBaselineService;
import com.dianping.cat.report.page.dependency.graph.TopologyGraphBuilder; import com.dianping.cat.report.page.dependency.graph.TopologyGraphBuilder;
import com.dianping.cat.report.service.ReportService; import com.dianping.cat.report.service.ReportService;
import com.dianping.cat.report.task.DefaultTaskConsumer;
import com.dianping.cat.report.task.cross.CrossReportBuilder; import com.dianping.cat.report.task.cross.CrossReportBuilder;
import com.dianping.cat.report.task.dependency.DependencyReportBuilder; import com.dianping.cat.report.task.dependency.DependencyReportBuilder;
import com.dianping.cat.report.task.event.EventGraphCreator; import com.dianping.cat.report.task.event.EventGraphCreator;
...@@ -38,7 +39,6 @@ import com.dianping.cat.report.task.spi.ReportFacade; ...@@ -38,7 +39,6 @@ import com.dianping.cat.report.task.spi.ReportFacade;
import com.dianping.cat.report.task.sql.SqlMerger; import com.dianping.cat.report.task.sql.SqlMerger;
import com.dianping.cat.report.task.sql.SqlReportBuilder; import com.dianping.cat.report.task.sql.SqlReportBuilder;
import com.dianping.cat.report.task.state.StateReportBuilder; import com.dianping.cat.report.task.state.StateReportBuilder;
import com.dianping.cat.report.task.thread.DefaultTaskConsumer;
import com.dianping.cat.report.task.transaction.TransactionGraphCreator; import com.dianping.cat.report.task.transaction.TransactionGraphCreator;
import com.dianping.cat.report.task.transaction.TransactionMerger; import com.dianping.cat.report.task.transaction.TransactionMerger;
import com.dianping.cat.report.task.transaction.TransactionReportBuilder; import com.dianping.cat.report.task.transaction.TransactionReportBuilder;
......
/** /**
* *
*/ */
package com.dianping.cat.report.task.thread; package com.dianping.cat.report.task;
import java.util.Date; import java.util.Date;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
......
package com.dianping.cat.report.task.thread; package com.dianping.cat.report.task;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager; import com.dianping.cat.configuration.NetworkInterfaceManager;
......
package com.dianping.cat.report.task.thread;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.dal.jdbc.DalNotFoundException;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.consumer.advanced.MetricConfigManager;
import com.dianping.cat.core.dal.Task;
import com.dianping.cat.core.dal.TaskDao;
import com.dianping.cat.core.dal.TaskEntity;
import com.dianping.cat.helper.CatString;
import com.dianping.cat.helper.TimeUtil;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.report.service.ReportService;
import com.dianping.cat.report.task.TaskHelper;
import com.dianping.cat.report.task.spi.ReportFacade;
public class TaskProducer implements org.unidal.helper.Threads.Task, Initializable {
@Inject
private ReportService m_reportService;
@Inject
private TaskDao m_taskDao;
@Inject
private MetricConfigManager m_configManager;
private Set<String> m_dailyReportNameSet = new HashSet<String>();
private Set<String> m_graphReportNameSet = new HashSet<String>();
private static final String STATE = "state";
private long m_currentDay;
private void createDailyReportTasks(Date date) {
generateReportTasks(date, new Date(date.getTime() + TimeUtil.ONE_DAY), ReportFacade.TYPE_DAILY);
}
private void createMonthReportTasks(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
int dayOfMonth = cal.get(Calendar.DAY_OF_MONTH);
if (dayOfMonth == 1) {
Calendar monthEnd = Calendar.getInstance();
monthEnd.setTime(date);
monthEnd.add(Calendar.MONTH, 1);
generateReportTasks(date, monthEnd.getTime(), ReportFacade.TYPE_WEEK);
}
}
private void generateDailyMetricBaselineTasks(Date date) {
try {
Set<String> groups = m_configManager.getMetricConfig().getMetricItemConfigs().keySet();
for (String group : groups) {
try {
m_taskDao.findByDomainNameTypePeriod("metricBaseline", group, ReportFacade.TYPE_DAILY, date,
TaskEntity.READSET_FULL);
} catch (DalNotFoundException e) {
insertTask(group, "metricBaseline", ReportFacade.TYPE_DAILY, date);
}
}
} catch (Exception e) {
Cat.logError(e);
}
}
private void createWeeklyReportTasks(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
int dayOfWeek = cal.get(Calendar.DAY_OF_WEEK);
if (dayOfWeek == 7) {
generateReportTasks(date, new Date(date.getTime() + TimeUtil.ONE_DAY * 7), ReportFacade.TYPE_WEEK);
}
}
private void creatReportTask(Date date) {
generateDailyMetricBaselineTasks(date);
createDailyReportTasks(date);
createWeeklyReportTasks(date);
createMonthReportTasks(date);
}
private void generateReportTasks(Date start, Date end, int reportType) {
Set<String> domainSet = queryDomainSet(start, end);
for (String domain : domainSet) {
for (String name : m_dailyReportNameSet) {
insertTask(domain, name, reportType, start);
}
}
insertTask(CatString.CAT, STATE, reportType, start);
}
@Override
public String getName() {
return "Task-Producer";
}
@Override
public void initialize() throws InitializationException {
m_dailyReportNameSet.add("event");
m_dailyReportNameSet.add("transaction");
m_dailyReportNameSet.add("problem");
m_dailyReportNameSet.add("matrix");
m_dailyReportNameSet.add("cross");
m_dailyReportNameSet.add("sql");
m_dailyReportNameSet.add("health");
m_graphReportNameSet.add("transaction");
m_graphReportNameSet.add("event");
m_graphReportNameSet.add("problem");
m_graphReportNameSet.add("heartbeat");
}
private void insertTask(String domain, String reportName, int taskType, Date taskPeriod) {
try {
m_taskDao.findByDomainNameTypePeriod(reportName, domain, taskType, taskPeriod, TaskEntity.READSET_FULL);
} catch (DalNotFoundException e) {
Task task = m_taskDao.createLocal();
task.setCreationDate(new Date());
task.setProducer(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());
task.setReportDomain(domain);
task.setReportName(reportName);
task.setReportPeriod(taskPeriod);
task.setStatus(1);
task.setTaskType(taskType);
try {
m_taskDao.insert(task);
} catch (Exception ex) {
Cat.logError(ex);
}
} catch (Exception e) {
Cat.logError(e);
}
}
private Set<String> queryDomainSet(Date start, Date end) {
return m_reportService.queryAllDomainNames(start, end, "transaction");
}
@Override
public void run() {
boolean active = true;
while (active) {
Date currentDay = TimeUtil.getCurrentDay();
if (currentDay.getTime() > m_currentDay) {
Calendar cal = Calendar.getInstance();
int minute = cal.get(Calendar.MINUTE);
Date yesterday = TaskHelper.yesterdayZero(new Date());
Transaction t = Cat.newTransaction("System", "CreateTask");
try {
// Daily report should created after last day reports all insert to database
if (minute > 6) {
creatReportTask(yesterday);
} else {
try {
Thread.sleep((7 - minute) * TimeUtil.ONE_MINUTE);
} catch (InterruptedException e) {
active = false;
}
creatReportTask(yesterday);
}
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
Cat.logError(e);
} finally {
t.complete();
}
m_currentDay = currentDay.getTime();
}
try {
Thread.sleep(5 * TimeUtil.ONE_MINUTE);
} catch (InterruptedException e) {
active = false;
}
}
}
@Override
public void shutdown() {
}
}
...@@ -9,7 +9,6 @@ import junit.framework.Assert; ...@@ -9,7 +9,6 @@ import junit.framework.Assert;
import org.junit.Test; import org.junit.Test;
import com.dianping.cat.core.dal.Task; import com.dianping.cat.core.dal.Task;
import com.dianping.cat.report.task.thread.TaskConsumer;
public class TaskConsumerTest { public class TaskConsumerTest {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册