提交 287669ec 编写于 作者: J jialinsun

添加puppet并重构,cat-agent 重构,metric 页面添加单位,网络流量单位换成MB/秒

上级 e2200f24
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
<dependency> <dependency>
<groupId>org.unidal.framework</groupId> <groupId>org.unidal.framework</groupId>
<artifactId>foundation-service</artifactId> <artifactId>foundation-service</artifactId>
<version>2.0.5</version> <version>2.0.8</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dianping.cat</groupId> <groupId>com.dianping.cat</groupId>
......
...@@ -6,14 +6,15 @@ import java.util.List; ...@@ -6,14 +6,15 @@ import java.util.List;
import org.unidal.lookup.configuration.AbstractResourceConfigurator; import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component; import org.unidal.lookup.configuration.Component;
import com.dianping.cat.agent.monitor.DataSender;
import com.dianping.cat.agent.monitor.EnvironmentConfig; import com.dianping.cat.agent.monitor.EnvironmentConfig;
import com.dianping.cat.agent.monitor.Executor; import com.dianping.cat.agent.monitor.executors.DataSender;
import com.dianping.cat.agent.monitor.TaskExecutors; import com.dianping.cat.agent.monitor.executors.Executor;
import com.dianping.cat.agent.monitor.jvm.JVMMemoryExecutor; import com.dianping.cat.agent.monitor.executors.TaskExecutors;
import com.dianping.cat.agent.monitor.jvm.JVMStateExecutor; import com.dianping.cat.agent.monitor.executors.jvm.JVMMemoryExecutor;
import com.dianping.cat.agent.monitor.system.SystemPerformanceExecutor; import com.dianping.cat.agent.monitor.executors.jvm.JVMStateExecutor;
import com.dianping.cat.agent.monitor.system.SystemStateExecutor; import com.dianping.cat.agent.monitor.executors.system.SystemPerformanceExecutor;
import com.dianping.cat.agent.monitor.executors.system.SystemStateExecutor;
import com.dianping.cat.agent.monitor.puppet.PuppetTask;
public class ComponentsConfigurator extends AbstractResourceConfigurator { public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override @Override
...@@ -27,11 +28,11 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { ...@@ -27,11 +28,11 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(Executor.class, SystemPerformanceExecutor.ID, SystemPerformanceExecutor.class).req( all.add(C(Executor.class, SystemPerformanceExecutor.ID, SystemPerformanceExecutor.class).req(
EnvironmentConfig.class)); EnvironmentConfig.class));
all.add(C(Executor.class, SystemStateExecutor.ID, SystemStateExecutor.class).req(EnvironmentConfig.class)); all.add(C(Executor.class, SystemStateExecutor.ID, SystemStateExecutor.class).req(EnvironmentConfig.class));
all.add(C(DataSender.class).req(EnvironmentConfig.class)); all.add(C(DataSender.class).req(EnvironmentConfig.class));
all.add(C(TaskExecutors.class).req(DataSender.class)); all.add(C(TaskExecutors.class).req(DataSender.class));
all.add(C(PuppetTask.class).req(EnvironmentConfig.class));
// Please keep it as last // Please keep it as last
all.addAll(new WebComponentConfigurator().defineComponents()); all.addAll(new WebComponentConfigurator().defineComponents());
return all; return all;
......
...@@ -11,8 +11,9 @@ import org.unidal.web.mvc.annotation.OutboundActionMeta; ...@@ -11,8 +11,9 @@ import org.unidal.web.mvc.annotation.OutboundActionMeta;
import org.unidal.web.mvc.annotation.PayloadMeta; import org.unidal.web.mvc.annotation.PayloadMeta;
import com.dianping.cat.agent.core.CorePage; import com.dianping.cat.agent.core.CorePage;
import com.dianping.cat.agent.monitor.DataSender; import com.dianping.cat.agent.monitor.executors.DataSender;
import com.dianping.cat.agent.monitor.TaskExecutors; import com.dianping.cat.agent.monitor.executors.TaskExecutors;
import com.dianping.cat.agent.monitor.puppet.PuppetTask;
public class Handler implements PageHandler<Context> { public class Handler implements PageHandler<Context> {
@Inject @Inject
...@@ -24,6 +25,9 @@ public class Handler implements PageHandler<Context> { ...@@ -24,6 +25,9 @@ public class Handler implements PageHandler<Context> {
@Inject @Inject
private TaskExecutors m_taskExecutors; private TaskExecutors m_taskExecutors;
@Inject
private PuppetTask m_puppetTask;
@Override @Override
@PayloadMeta(Payload.class) @PayloadMeta(Payload.class)
@InboundActionMeta(name = "index") @InboundActionMeta(name = "index")
...@@ -37,7 +41,7 @@ public class Handler implements PageHandler<Context> { ...@@ -37,7 +41,7 @@ public class Handler implements PageHandler<Context> {
model.setAction(Action.VIEW); model.setAction(Action.VIEW);
model.setPage(CorePage.INDEX); model.setPage(CorePage.INDEX);
if (!ctx.isProcessStopped()) { if (!ctx.isProcessStopped()) {
m_jspViewer.view(ctx, model); m_jspViewer.view(ctx, model);
} }
......
...@@ -15,15 +15,19 @@ public class EnvironmentConfig implements Initializable { ...@@ -15,15 +15,19 @@ public class EnvironmentConfig implements Initializable {
private static final String CONFIG_FILE = "/data/webapps/server.properties"; private static final String CONFIG_FILE = "/data/webapps/server.properties";
private static final String URL_FORMAT = "http://%1$s/cat/r/monitor?op=batch&timestamp=%2$s&group=%3$s&domain=%4$s"; private static final String SYSTEM_URL = "http://%1$s/cat/r/monitor?op=batch&timestamp=%2$s&group=%3$s&domain=%4$s";
private static final List<String> m_servers = Arrays.asList("localhost:2281", "10.1.110.57:8080", private static final String ALTERATION_URL = "http://%1$s/cat/r/alteration";
private static final List<String> CAT_SERVERS = Arrays.asList("10.128.120.60:2281", "10.1.110.57:8080",
"10.1.110.23:8080", "10.1.110.21:8080"); "10.1.110.23:8080", "10.1.110.21:8080");
private String m_ip; private String m_ip;
private String m_domain; private String m_domain;
private String m_hostName;
// host.name 配置规则: // host.name 配置规则:
// [${domain}01.nh0] [${domain}01.beta] [${domain}-ppe01.hm] [${domain}-sl-**] [${domain}-gp-**] // [${domain}01.nh0] [${domain}01.beta] [${domain}-ppe01.hm] [${domain}-sl-**] [${domain}-gp-**]
private String buildDomain(String hostName) { private String buildDomain(String hostName) {
...@@ -43,11 +47,15 @@ public class EnvironmentConfig implements Initializable { ...@@ -43,11 +47,15 @@ public class EnvironmentConfig implements Initializable {
return domain; return domain;
} }
public String buildUrl(String server) { public String buildSystemUrl(String server) {
String group = getGroup(); String group = getGroup();
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
return String.format(URL_FORMAT, server, current, group, m_domain); return String.format(SYSTEM_URL, server, current, group, m_domain);
}
public String buildAlterationUrl(String server) {
return String.format(ALTERATION_URL, server);
} }
public String getDomain() { public String getDomain() {
...@@ -63,7 +71,11 @@ public class EnvironmentConfig implements Initializable { ...@@ -63,7 +71,11 @@ public class EnvironmentConfig implements Initializable {
} }
public List<String> getServers() { public List<String> getServers() {
return m_servers; return CAT_SERVERS;
}
public String getHostName() {
return m_hostName;
} }
@Override @Override
...@@ -73,13 +85,13 @@ public class EnvironmentConfig implements Initializable { ...@@ -73,13 +85,13 @@ public class EnvironmentConfig implements Initializable {
InputStream in = new BufferedInputStream(new FileInputStream(CONFIG_FILE)); InputStream in = new BufferedInputStream(new FileInputStream(CONFIG_FILE));
properties.load(in); properties.load(in);
String hostName = properties.getProperty("host.name"); m_hostName = properties.getProperty("host.name");
if (hostName == null) { if (m_hostName == null) {
hostName = NetworkInterfaceManager.INSTANCE.getLocalHostName(); m_hostName = NetworkInterfaceManager.INSTANCE.getLocalHostName();
} }
m_domain = buildDomain(hostName); m_domain = buildDomain(m_hostName);
m_ip = properties.getProperty("host.ip"); m_ip = properties.getProperty("host.ip");
if (m_ip == null) { if (m_ip == null) {
......
package com.dianping.cat.agent.monitor; package com.dianping.cat.agent.monitor.executors;
import org.unidal.lookup.annotation.Inject; import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.agent.monitor.EnvironmentConfig;
public abstract class AbstractExecutor implements Executor { public abstract class AbstractExecutor implements Executor {
@Inject @Inject
......
package com.dianping.cat.agent.monitor; package com.dianping.cat.agent.monitor.executors;
public class DataEntity { public class DataEntity {
......
package com.dianping.cat.agent.monitor; package com.dianping.cat.agent.monitor.executors;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
...@@ -18,6 +18,7 @@ import org.unidal.helper.Threads.Task; ...@@ -18,6 +18,7 @@ import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject; import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.agent.monitor.EnvironmentConfig;
import com.dianping.cat.message.Event; import com.dianping.cat.message.Event;
import com.dianping.cat.message.Transaction; import com.dianping.cat.message.Transaction;
...@@ -26,7 +27,7 @@ public class DataSender implements Task, Initializable { ...@@ -26,7 +27,7 @@ public class DataSender implements Task, Initializable {
@Inject @Inject
private EnvironmentConfig m_environmentConfig; private EnvironmentConfig m_environmentConfig;
private BlockingQueue<DataEntity> m_entities = new ArrayBlockingQueue<DataEntity>(5000); private static BlockingQueue<DataEntity> m_entities = new ArrayBlockingQueue<DataEntity>(5000);
private List<DataEntity> m_dataEntities = new ArrayList<DataEntity>(); private List<DataEntity> m_dataEntities = new ArrayList<DataEntity>();
...@@ -37,14 +38,19 @@ public class DataSender implements Task, Initializable { ...@@ -37,14 +38,19 @@ public class DataSender implements Task, Initializable {
public boolean put(List<DataEntity> entities) { public boolean put(List<DataEntity> entities) {
boolean result = true; boolean result = true;
for (DataEntity entity : entities) { try {
boolean temp = m_entities.offer(entity); for (DataEntity entity : entities) {
boolean temp = m_entities.offer(entity, 5, TimeUnit.MILLISECONDS);
if (!temp) { if (!temp) {
result = temp; result = temp;
}
} }
return result;
} catch (Exception e) {
Cat.logError(e);
} }
return result; return false;
} }
private String buildBatchEntities(List<DataEntity> entities) { private String buildBatchEntities(List<DataEntity> entities) {
...@@ -85,7 +91,7 @@ public class DataSender implements Task, Initializable { ...@@ -85,7 +91,7 @@ public class DataSender implements Task, Initializable {
List<String> servers = m_environmentConfig.getServers(); List<String> servers = m_environmentConfig.getServers();
for (String server : servers) { for (String server : servers) {
String url = m_environmentConfig.buildUrl(server); String url = m_environmentConfig.buildSystemUrl(server);
String entityContent = buildBatchEntities(m_dataEntities); String entityContent = buildBatchEntities(m_dataEntities);
String content = "&batch=" + entityContent; String content = "&batch=" + entityContent;
...@@ -96,17 +102,19 @@ public class DataSender implements Task, Initializable { ...@@ -96,17 +102,19 @@ public class DataSender implements Task, Initializable {
@Override @Override
public void run() { public void run() {
while (true) { boolean active = true;
while (active) {
Transaction t = Cat.newTransaction("Data", "Send"); Transaction t = Cat.newTransaction("Data", "Send");
long current = System.currentTimeMillis();
try { try {
long current = System.currentTimeMillis();
int maxSize = MAX_ENTITIES; int maxSize = MAX_ENTITIES;
try { try {
while (m_entities.size() > 0 && maxSize > 0) { while (m_entities.size() > 0 && maxSize > 0) {
DataEntity entity = m_entities.poll(5, TimeUnit.MILLISECONDS); DataEntity entity = m_entities.poll(5, TimeUnit.MILLISECONDS);
m_dataEntities.add(entity); m_dataEntities.add(entity);
maxSize--; maxSize--;
} }
...@@ -122,14 +130,13 @@ public class DataSender implements Task, Initializable { ...@@ -122,14 +130,13 @@ public class DataSender implements Task, Initializable {
Cat.logError(e); Cat.logError(e);
} }
long duration = System.currentTimeMillis() - current; long duration = System.currentTimeMillis() - current;
long sleeptime = DURATION - duration;
if (sleeptime > 0) { try {
try { if (duration < DURATION) {
Thread.sleep(sleeptime); Thread.sleep(DURATION - duration);
} catch (InterruptedException e) {
break;
} }
} catch (InterruptedException e) {
active = false;
} }
t.setStatus(Transaction.SUCCESS); t.setStatus(Transaction.SUCCESS);
} catch (Exception e) { } catch (Exception e) {
......
package com.dianping.cat.agent.monitor; package com.dianping.cat.agent.monitor.executors;
import java.util.List; import java.util.List;
......
package com.dianping.cat.agent.monitor; package com.dianping.cat.agent.monitor.executors;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
...@@ -38,7 +38,9 @@ public class TaskExecutors extends ContainerHolder implements Task, Initializabl ...@@ -38,7 +38,9 @@ public class TaskExecutors extends ContainerHolder implements Task, Initializabl
@Override @Override
public void run() { public void run() {
while (true) { boolean active = true;
while (active) {
Transaction t = Cat.newTransaction("Data", "Fetch"); Transaction t = Cat.newTransaction("Data", "Fetch");
try { try {
...@@ -48,7 +50,7 @@ public class TaskExecutors extends ContainerHolder implements Task, Initializabl ...@@ -48,7 +50,7 @@ public class TaskExecutors extends ContainerHolder implements Task, Initializabl
Transaction t2 = Cat.newTransaction("Executor", executor.getId()); Transaction t2 = Cat.newTransaction("Executor", executor.getId());
try { try {
List<DataEntity> entities = executor.execute(); List<DataEntity> entities = executor.execute();
m_sender.put(entities); m_sender.put(entities);
t2.setStatus(Transaction.SUCCESS); t2.setStatus(Transaction.SUCCESS);
} catch (Exception e) { } catch (Exception e) {
...@@ -59,14 +61,13 @@ public class TaskExecutors extends ContainerHolder implements Task, Initializabl ...@@ -59,14 +61,13 @@ public class TaskExecutors extends ContainerHolder implements Task, Initializabl
} }
} }
long duration = System.currentTimeMillis() - current; long duration = System.currentTimeMillis() - current;
long sleeptime = DURATION - duration;
if (sleeptime > 0) { try {
try { if (duration < DURATION) {
Thread.sleep(sleeptime); Thread.sleep(DURATION - duration);
} catch (InterruptedException e) {
break;
} }
} catch (InterruptedException e) {
active = false;
} }
t.setStatus(Transaction.SUCCESS); t.setStatus(Transaction.SUCCESS);
} catch (Exception e) { } catch (Exception e) {
......
package com.dianping.cat.agent.monitor.jvm; package com.dianping.cat.agent.monitor.executors.jvm;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.InputStreamReader; import java.io.InputStreamReader;
...@@ -6,8 +6,8 @@ import java.util.ArrayList; ...@@ -6,8 +6,8 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.agent.monitor.AbstractExecutor; import com.dianping.cat.agent.monitor.executors.AbstractExecutor;
import com.dianping.cat.agent.monitor.DataEntity; import com.dianping.cat.agent.monitor.executors.DataEntity;
public class JVMMemoryExecutor extends AbstractExecutor { public class JVMMemoryExecutor extends AbstractExecutor {
......
package com.dianping.cat.agent.monitor.jvm; package com.dianping.cat.agent.monitor.executors.jvm;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.dianping.cat.agent.monitor.AbstractExecutor; import com.dianping.cat.agent.monitor.executors.AbstractExecutor;
import com.dianping.cat.agent.monitor.DataEntity; import com.dianping.cat.agent.monitor.executors.DataEntity;
public class JVMStateExecutor extends AbstractExecutor { public class JVMStateExecutor extends AbstractExecutor {
......
package com.dianping.cat.agent.monitor.system; package com.dianping.cat.agent.monitor.executors.system;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
...@@ -13,8 +13,8 @@ import org.hyperic.sigar.SigarException; ...@@ -13,8 +13,8 @@ import org.hyperic.sigar.SigarException;
import org.hyperic.sigar.Swap; import org.hyperic.sigar.Swap;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.agent.monitor.AbstractExecutor; import com.dianping.cat.agent.monitor.executors.AbstractExecutor;
import com.dianping.cat.agent.monitor.DataEntity; import com.dianping.cat.agent.monitor.executors.DataEntity;
public class SystemPerformanceExecutor extends AbstractExecutor { public class SystemPerformanceExecutor extends AbstractExecutor {
......
package com.dianping.cat.agent.monitor.system; package com.dianping.cat.agent.monitor.executors.system;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
...@@ -15,8 +15,8 @@ import org.hyperic.sigar.SigarException; ...@@ -15,8 +15,8 @@ import org.hyperic.sigar.SigarException;
import org.hyperic.sigar.Uptime; import org.hyperic.sigar.Uptime;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.agent.monitor.AbstractExecutor; import com.dianping.cat.agent.monitor.executors.AbstractExecutor;
import com.dianping.cat.agent.monitor.DataEntity; import com.dianping.cat.agent.monitor.executors.DataEntity;
public class SystemStateExecutor extends AbstractExecutor implements Initializable { public class SystemStateExecutor extends AbstractExecutor implements Initializable {
......
package com.dianping.cat.agent.monitor.puppet; package com.dianping.cat.agent.monitor.puppet;
public class Alertation { public class Alteration {
@Override
public String toString() {
return "Alteration [type=" + type + ", ip=" + ip + ", hostname=" + hostname + ", user=" + user + ", url=" + url
+ ", op=" + op + ", date=" + date + ", title=" + title + ", content=" + content + ", domain=" + domain
+ ", group=" + group + "]";
}
private String type; private String type;
private String ip; private String ip;
......
package com.dianping.cat.agent.monitor.puppet;
import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Locale;
import java.util.regex.Pattern;
import com.dianping.cat.Cat;
import com.dianping.cat.agent.monitor.EnvironmentConfig;
public class AlterationParser {
private EnvironmentConfig m_environmentConfig;
public AlterationParser(EnvironmentConfig config) {
m_environmentConfig = config;
}
public Alteration parse(String line) {
String title_add = "";
String group = "puppet";
String type = "puppet";
String user = "puppet";
String url = "";
String op = "insert";
String date = "";
String content = "";
String host = m_environmentConfig.getHostName();
String IP = m_environmentConfig.getIp();
String domain = m_environmentConfig.getDomain();
String title = "puppet";
String regEx = ".*puppet-agent.*\\(\\/Stage";
String regEx_Filebucketed = ".*Filebucketed.*";
String tmp = "";
Alteration alertation = new Alteration();
// 'Mar 25 10:56:27 localhost puppet-agent[24773]: (/Stage[main]/Zabbix::Agentd/Exec[restart_zabbix_agentd]) Triggered
// 'refresh' from 1 events'
if (Pattern.compile(regEx).matcher(line).find()) {
String[] tmp_list = line.split(" +");
Calendar c = Calendar.getInstance();
int year = c.get(Calendar.YEAR);
if (tmp_list.length >= 4) {
date = tmp_list[0] + " " + tmp_list[1] + " " + tmp_list[2] + " " + Integer.toString(year);
}
String all_content = line.split("\\(")[1]; // "/Stage[main]/Zabbix::Agentd/Exec[restart_zabbix_agentd]) Triggered 'refresh' from 1 events"
String[] tmp_string = all_content.split("\\)"); // '/Stage[main]/Zabbix::Agentd/Exec[restart_zabbix_agentd]', ''
String[] tmp_string_main = tmp_string[0].split("\\[main\\]\\/");
if (tmp_string_main.length >= 2) {
title = tmp_string_main[1].split("\\[")[1].split("\\]")[0]; // restart_zabbix_agentd
if (title == "") {
title = "puppet";
}
}
if (tmp_string.length >= 2) {
content = all_content.split("\\)")[1]; // " Triggered 'refresh' from 1 events"
String[] tmpContent = content.split(" ");
if (tmpContent.length >= 3) {
title_add = tmpContent[1] + " " + tmpContent[2];
title = title + " " + title_add; // restart_zabbix_agentd Triggered 'refresh'
}
}
if (Pattern.compile(regEx_Filebucketed).matcher(content).find()) {
String[] tmpContent = content.split(" ");
String new_file = tmpContent[2];// "/usr/local/nginx/conf/nginx_app.conf"
// " Filebucketed /usr/local/nginx/conf/nginx_app.conf to puppet with sum 99c3e5f79645493fdcf4340dd457cbe4"
if (tmpContent.length >= 8) {
String old_file_index = tmpContent[7]; // 99c3e5f79645493fdcf4340dd457cbe4
String old_file_dir = Utils.runSysCmd("find /var/lib/puppet/clientbucket -name " + old_file_index)
.toString().split("\n")[0];
String old_file = old_file_dir + "/contents";
if (new File(new_file).exists() && new File(old_file).exists()) {
tmp = Utils.runSysCmd("diff " + old_file + " " + new_file).toString();
}
if (tmp.trim() != "") {
content = tmp;
}
}
}
SimpleDateFormat sdf_mmm = new SimpleDateFormat("MMMM dd HH:mm:ss yyyy", Locale.US);
SimpleDateFormat sdf_normal = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
try {
date = sdf_normal.format(sdf_mmm.parse(date));
} catch (ParseException e1) {
Cat.logError(e1);
sdf_mmm = new SimpleDateFormat("MMMM dd HH:mm:ss yyyy", Locale.US);
try {
date = sdf_normal.format(sdf_mmm.parse(date));
} catch (ParseException e) {
Cat.logError(e);
}
return null;
}
alertation.setHostname(host);
alertation.setIp(IP);
alertation.setDomain(domain);
alertation.setTitle(title);
alertation.setContent(content);
alertation.setOp(op);
alertation.setUrl(url);
alertation.setUser(user);
alertation.setType(type);
alertation.setDate(date);
alertation.setGroup(group);
} else {
alertation = null;
}
return alertation;
}
}
package com.dianping.cat.agent.monitor.puppet.util; package com.dianping.cat.agent.monitor.puppet;
import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import org.apache.log4j.Logger;
public class HttpPostUtils { import org.unidal.helper.Files;
private static Logger puppetLogger = Logger.getLogger("myLogger");
private String urlAddress; import com.dianping.cat.Cat;
import com.dianping.cat.agent.monitor.EnvironmentConfig;
import com.dianping.cat.message.Event;
public String getUrlAddress() { public class DataSender {
return urlAddress;
private EnvironmentConfig m_environmentConfig;
public DataSender(EnvironmentConfig config) {
m_environmentConfig = config;
} }
public void setUrlAddress(String urlAddress) { public void send(Alteration alertation) {
this.urlAddress = urlAddress; String[] pars = new String[11];
pars[0] = "type=" + alertation.getType();
pars[1] = "title=" + alertation.getTitle();
pars[2] = "domain=" + alertation.getDomain();
pars[3] = "ip=" + alertation.getIp();
pars[4] = "user=" + alertation.getUser();
pars[5] = "content=" + alertation.getContent();
pars[6] = "url=" + alertation.getUrl();
pars[7] = "op=" + alertation.getOp();
pars[8] = "alterationDate=" + alertation.getDate();
pars[9] = "hostname=" + alertation.getHostname();
pars[10] = "group=" + alertation.getGroup();
for (String server : m_environmentConfig.getServers()) {
String url = m_environmentConfig.buildAlterationUrl(server);
if (postData(pars, url)) {
break;
}
}
} }
public boolean httpPost(String[] params) { public boolean postData(String[] params, String urlStr) {
boolean flag = false;
URL url = null; URL url = null;
HttpURLConnection con = null; HttpURLConnection con = null;
BufferedReader in = null; String result = null;
StringBuffer result = new StringBuffer();
String paramsTemp = ""; String paramsTemp = "";
try { try {
url = new URL(this.urlAddress); url = new URL(urlStr);
con = (HttpURLConnection) url.openConnection(); con = (HttpURLConnection) url.openConnection();
con.setUseCaches(false); con.setUseCaches(false);
con.setDoOutput(true); con.setDoOutput(true);
con.setRequestMethod("POST"); con.setRequestMethod("POST");
for (String param : params) { for (String param : params) {
if (param != null && !"".equals(param.trim())) { if (param != null && !"".equals(param.trim())) {
paramsTemp += "&" + param; paramsTemp += "&" + param;
} }
} }
// puppetLogger.info("POST para:"+paramsTemp);
byte[] b = paramsTemp.getBytes(); byte[] b = paramsTemp.getBytes();
con.getOutputStream().write(b); // to be tested con.getOutputStream().write(b); // to be tested
con.getOutputStream().flush(); con.getOutputStream().flush();
con.getOutputStream().close(); con.getOutputStream().close();
in = new BufferedReader(new InputStreamReader(con.getInputStream()));
while (true) { result = Files.forIO().readFrom(con.getInputStream(), "utf-8");
String line = in.readLine();
if (line == null) {
break;
} else {
result.append(line);
}
}
} catch (IOException e) { } catch (IOException e) {
// e.printStackTrace(); Cat.logError(e);
// puppetLogger.debug(e);
puppetLogger.error(e.getMessage(), e);
} finally { } finally {
try { if (con != null) {
if (in != null) { con.disconnect();
in.close();
}
if (con != null) {
con.disconnect();
}
} catch (IOException e) {
puppetLogger.error(e.getMessage(), e);
} }
} }
String rs = result.toString(); boolean flag = false;
if (rs.contains("200")) {
if (result.contains("200")) {
flag = true; flag = true;
puppetLogger.info("POST Succ");
} else { } else {
flag = false; Cat.logEvent("Puppet", "Failed in posting data", Event.SUCCESS, result);
puppetLogger.error("POST Fail:" + paramsTemp);
} }
return flag; return flag;
} }
......
...@@ -3,43 +3,46 @@ package com.dianping.cat.agent.monitor.puppet; ...@@ -3,43 +3,46 @@ package com.dianping.cat.agent.monitor.puppet;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task; import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.agent.monitor.puppet.util.CreatDir; import com.dianping.cat.Cat;
import com.dianping.cat.agent.monitor.puppet.util.GetReaderPostion; import com.dianping.cat.agent.monitor.EnvironmentConfig;
import com.dianping.cat.agent.monitor.puppet.util.Parse; import com.dianping.cat.message.Transaction;
import com.dianping.cat.agent.monitor.puppet.util.RunSysCmd;
import com.dianping.cat.agent.monitor.puppet.util.SendHttp;
import com.dianping.cat.agent.monitor.puppet.util.SetReaderPostion;
public class PuppetTask implements Task, Initializable { public class PuppetTask implements Task, Initializable {
private static String m_logFile; @Inject
private EnvironmentConfig m_environmentConfig;
private static String m_lineFile; private DataSender m_dataSender;
private static Logger puppetLogger = Logger.getLogger("myLogger"); private AlterationParser m_alterationParser;
private static final String LOG_FILE = "/var/log/messages";
private static final int DURATION = 60 * 1000;
private ReaderManager m_readerManager = new ReaderManager();
@Override @Override
public void run() { public void run() {
SendHttp sendhttp = new SendHttp();
Parse parse = new Parse();
GetReaderPostion getreaderpostion = new GetReaderPostion();
SetReaderPostion setreaderpostion = new SetReaderPostion();
boolean active = true; boolean active = true;
Long end_position = 0L; Long end_position = 0L;
RunSysCmd runsyscmd = new RunSysCmd(); Transaction t = Cat.newTransaction("Puppet", "Task");
while (active) { while (active) {
Alertation alertation = null; long current = System.currentTimeMillis();
Long position = getreaderpostion.getReaderPostion(m_lineFile); Alteration alertation = null;
Long position = m_readerManager.queryPointer();
RandomAccessFile reader = null; RandomAccessFile reader = null;
try { try {
reader = new RandomAccessFile(m_logFile, "r"); reader = new RandomAccessFile(LOG_FILE, "r");
reader.seek(position); reader.seek(position);
// 判断日志是否切割了,一定要放在while((line=reader.readLine())!=null)之前,否则回导致反复读取 // 判断日志是否切割了,一定要放在while((line=reader.readLine())!=null)之前,否则回导致反复读取
if (position >= 2) { if (position >= 2) {
...@@ -48,54 +51,53 @@ public class PuppetTask implements Task, Initializable { ...@@ -48,54 +51,53 @@ public class PuppetTask implements Task, Initializable {
reader.readChar(); reader.readChar();
reader.seek(position); reader.seek(position);
} catch (IOException e) { } catch (IOException e) {
setreaderpostion.setReaderPostion(m_lineFile, 0L); m_readerManager.updatePointer(0L);
reader.seek(0L); reader.seek(0L);
puppetLogger.error(e.getMessage(), e); Cat.logError(e);
} }
} }
String line = null; String line = null;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
alertation = parse.parse(line); alertation = m_alterationParser.parse(line);
if (alertation != null) { if (alertation != null) {
sendhttp.sendHttp(alertation); m_dataSender.send(alertation);
} else { } else {
continue; continue;
} }
} }
end_position = reader.getFilePointer(); end_position = reader.getFilePointer();
} catch (IOException e) {
puppetLogger.error("读文件异常:" + m_logFile);
puppetLogger.error(e.getMessage(), e);
} finally {
if (end_position > position) { if (end_position > position) {
setreaderpostion.setReaderPostion(m_lineFile, end_position); m_readerManager.updatePointer(end_position);
} }
t.setStatus(Transaction.SUCCESS);
} catch (IOException e) {
Cat.logError("读文件异常:" + LOG_FILE, e);
} finally {
try { try {
reader.close(); if (reader != null) {
reader.close();
}
} catch (IOException e) { } catch (IOException e) {
puppetLogger.error(e.getMessage(), e); Cat.logError(e);
} }
puppetLogger.info("本次读取的开始偏移量:" + position + " 末尾偏移量:" + end_position);
}
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
puppetLogger.error(e.getMessage(), e);
}
}
}
@Override long duration = System.currentTimeMillis() - current;
public void initialize() throws InitializationException {
m_logFile = "/var/log/messages"; try {
m_lineFile = "/var/log/line_random.log"; if (duration < DURATION) {
Thread.sleep(DURATION - duration);
}
} catch (InterruptedException e) {
active = false;
}
CreatDir creatdir = new CreatDir(); t.complete();
creatdir.creatDir("/data/applogs/monitor"); }
PropertyConfigurator.configure("log4j.properties"); }
// Threads.forGroup("Cat").start(this);
} }
@Override @Override
...@@ -107,4 +109,11 @@ public class PuppetTask implements Task, Initializable { ...@@ -107,4 +109,11 @@ public class PuppetTask implements Task, Initializable {
public void shutdown() { public void shutdown() {
} }
@Override
public void initialize() throws InitializationException {
m_dataSender = new DataSender(m_environmentConfig);
m_alterationParser = new AlterationParser(m_environmentConfig);
Threads.forGroup("Cat").start(this);
}
} }
package com.dianping.cat.agent.monitor.puppet.util; package com.dianping.cat.agent.monitor.puppet;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileReader; import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import org.apache.log4j.Logger; import com.dianping.cat.Cat;
public class ReaderManager {
public class GetReaderPostion { private static final String POINTER_FILE = "/var/log/currentPointer";
private Logger puppetLogger = Logger.getLogger("myLogger");
public long queryPointer() {
/**
*
* @param line_file,记录文件读取位置的文件
* @return 记录的数据,否则返回0
* 读取文件失败的时候是否创建文件line_file
*
*/
public long getReaderPostion(String line_file) {
BufferedReader reader = null; BufferedReader reader = null;
try { try {
reader = new BufferedReader(new FileReader(line_file)); reader = new BufferedReader(new FileReader(POINTER_FILE));
String str = reader.readLine(); String str = reader.readLine();
if (str != null) { if (str != null) {
return Long.parseLong(str); return Long.parseLong(str);
}else{ } else {
return 0L; return 0L;
} }
}catch(FileNotFoundException e1){ } catch (FileNotFoundException e1) {
File filename = new File(line_file); File filename = new File(POINTER_FILE);
try { try {
filename.createNewFile(); filename.createNewFile();
} catch (IOException e2) { } catch (IOException e2) {
puppetLogger.error("创建文件失败:" + line_file); Cat.logError("创建文件失败:" + POINTER_FILE, e2);
puppetLogger.error(e2.getMessage(),e2); }
}
} catch (Exception e3) { } catch (Exception e3) {
puppetLogger.error(e3.getMessage(),e3); Cat.logError(e3);
} finally { } finally {
if (reader != null) { if (reader != null) {
try { try {
reader.close(); reader.close();
} catch (IOException e) { } catch (IOException e) {
puppetLogger.error(e.getMessage(),e); Cat.logError(e);
} }
} }
} }
return 0L; return 0L;
} }
public void updatePointer(long end_position) {
BufferedWriter output = null;
try {
output = new BufferedWriter(new FileWriter(POINTER_FILE));
output.write(Long.toString(end_position));
output.close();
} catch (IOException e) {
Cat.logError("写入文件异:" + POINTER_FILE, e);
}
}
} }
package com.dianping.cat.agent.monitor.puppet;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.regex.Pattern;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Event;
public class Utils {
public static StringBuffer runSysCmd(String cmd) {
String regEx = "^chown.*|^diff.*|^find.*";
StringBuffer result = new StringBuffer();
Runtime run = Runtime.getRuntime();
Process p = null;
if (!Pattern.compile(regEx).matcher(cmd).find()) {
Cat.logEvent("Puppet", "runSysCms failed", Event.SUCCESS, "不支持命令:" + cmd);
return null;
}
try {
p = run.exec(cmd);
BufferedInputStream in = new BufferedInputStream(p.getInputStream());
BufferedReader inBr = new BufferedReader(new InputStreamReader(in));
String lineStr;
while ((lineStr = inBr.readLine()) != null) {
result.append(lineStr + "\n");
}
if (p.waitFor() != 0) {
if (p.exitValue() != 0)// p.exitValue()==0表示正常结束,1:非正常结束
Cat.logEvent("Puppet", "命令执行失败?: " + cmd);
}
inBr.close();
in.close();
} catch (Exception e) {
Cat.logError(e);
} finally {
if (p != null) {
try {
p.getOutputStream().close();
p.getInputStream().close();
p.getErrorStream().close();
} catch (IOException e) {
Cat.logError(e);
}
}
}
return result;
}
}
log4j.rootLogger=info,file
log4j.logger.myLogger=info
log4j.logger.myLogger.mySonLogger=,file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.SimpleLayout
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.File=/data/applogs/monitor/puppetmonitor.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%p %d{yyyy-MM-dd HH:mm:ss,SSS} %t %m%n
\ No newline at end of file
package com.dianping.cat.agent.monitor.puppet.util;
import java.io.File;
import org.apache.log4j.Logger;
public class CreatDir {
private Logger puppetLogger = Logger.getLogger("myLogger");
public boolean creatDir(String dir){
File file =new File(dir);
if(!file .exists()){
file.mkdirs();
return true;
}else{
return false;
}
}
}
package com.dianping.cat.agent.monitor.puppet.util;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Locale;
import java.util.regex.Pattern;
import org.apache.log4j.Logger;
import com.dianping.cat.agent.monitor.puppet.Alertation;
public class Parse {
private Logger puppetLogger = Logger.getLogger("myLogger");
public Alertation parse(String line) {
RunSysCmd runsyscmd=new RunSysCmd();
String title_add="";
String group="puppet";
String type="puppet";
String user="puppet";
String url="";
String op="insert";
String host="";
String IP="";
String date="";
String domain="";
String content="";
String title="puppet";
String regEx = ".*puppet-agent.*\\(\\/Stage";
String regEx_time = ".*[0-9]{2}:[0-9]{2}:[0-9]{2}.*";
String regEx_Filebucketed=".*Filebucketed.*";
String tmp="";
Alertation alertation =new Alertation();
if (Pattern.compile(regEx).matcher(line).find()) {
String[] tmp_list = line.split(" ");
Calendar c = Calendar.getInstance();
int year = c.get(Calendar.YEAR);
InetAddress ia=null;
if(tmp_list.length>=4){
if(Pattern.compile(regEx_time).matcher(tmp_list[3]).find()){
date = tmp_list[0] +" " + tmp_list[2] + " "+tmp_list[3] + " " +Integer.toString(year);
}else{
date = tmp_list[0] +" " + tmp_list[1]+ " " + tmp_list[2] + " " +Integer.toString(year);
}
}
String all_content=line.split("\\(")[1];
String[] tmp_string=all_content.split("\\)");
String[] tmp_string_main=tmp_string[0].split("\\[main\\]\\/");
if (tmp_string_main.length >=2){
title=tmp_string_main[1].split("\\[")[1].split("\\]")[0];
if(title==""){
title="puppet";
}
}
if(tmp_string.length>=2){
content=all_content.split("\\)")[1];
if(content.split(" ").length>=3){
title_add=content.split(" ")[1]+" "+content.split(" ")[2];
title=title+" "+title_add;
}
}
if(Pattern.compile(regEx_Filebucketed).matcher(content).find()){
String new_file=content.split(" ")[2];
if(content.split(" ").length >=8){
String old_file_index=content.split(" ")[7];
//==========test======
// String old_file_dir=runsyscmd.runSysCmd("/usr/bin/find /var/lib/puppet/clientbucket -name "+old_file_index).toString().split("\n")[0];
String old_file_dir=runsyscmd.runSysCmd("find /var/lib/puppet/clientbucket -name "+old_file_index).toString().split("\n")[0];
String old_file=old_file_dir+"/contents";
if(new File(new_file).exists() && new File(old_file).exists() ){
puppetLogger.info("diff content ing "+" "+old_file+" "+new_file );
tmp=runsyscmd.runSysCmd("diff "+old_file+" "+ new_file).toString();
}
if(tmp.trim()!=""){
content=tmp;
}else{
puppetLogger.warn("diff return content is empty");
}
}
}
SimpleDateFormat sdf_mmm = new SimpleDateFormat("MMMM dd HH:mm:ss yyyy",Locale.US);
SimpleDateFormat sdf_normal = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);
try {
date=sdf_normal.format(sdf_mmm.parse(date));
ia = InetAddress.getLocalHost();
} catch (ParseException e1) {
puppetLogger.error(e1.getMessage(),e1);
sdf_mmm = new SimpleDateFormat("MMMM dd HH:mm:ss yyyy",Locale.US);
try {
date=sdf_normal.format(sdf_mmm.parse(date));
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}catch(UnknownHostException e){
alertation.setHostname("Unknow_hostname");
alertation.setDomain("Unknow_domain");
e.printStackTrace();
}
if(ia!=null){
host = ia.getHostName();
IP= ia.getHostAddress();
domain=host.split("[0-9]")[0].split("-sl-|-gp-|-ppe")[0];
}
alertation.setHostname(host);
alertation.setIp(IP);
alertation.setDomain(domain);
alertation.setTitle(title);
alertation.setContent(content);
alertation.setOp(op);
alertation.setUrl(url);
alertation.setUser(user);
alertation.setType(type);
alertation.setDate(date);
alertation.setGroup(group);
}
else{
alertation=null;
}
return alertation;
}
}
package com.dianping.cat.agent.monitor.puppet.util;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.regex.Pattern;
import org.apache.log4j.Logger;
public class RunSysCmd {
private Logger puppetLogger = Logger.getLogger("myLogger");
public StringBuffer runSysCmd(String cmd){
// puppetLogger.info("执行命令ing:"+cmd);
String regEx = "^chown.*|^diff.*|^find.*";
StringBuffer result = new StringBuffer();
Runtime run = Runtime.getRuntime();
Process p=null;
if(!Pattern.compile(regEx).matcher(cmd).find()){
System.out.println("不支持改命令"+cmd);
return null;
}
try {
p = run.exec(cmd);
BufferedInputStream in = new BufferedInputStream(p.getInputStream());
BufferedReader inBr = new BufferedReader(new InputStreamReader(in));
String lineStr;
while ((lineStr = inBr.readLine()) != null){
result.append(lineStr+"\n");
}
if (p.waitFor() != 0) {
if (p.exitValue() != 0)//p.exitValue()==0表示正常结束,1:非正常结束
puppetLogger.warn("命令执行失败?: "+cmd);
}
inBr.close();
in.close();
} catch (Exception e) {
puppetLogger.error(e.getMessage(),e);
}finally
{
if ( p != null)
{
try {
p.getOutputStream().close();
p.getInputStream().close();
p.getErrorStream().close();
} catch (IOException e) {
puppetLogger.error(e.getMessage(),e);
}
}
}
return result;
}
}
package com.dianping.cat.agent.monitor.puppet.util;
import com.dianping.cat.agent.monitor.puppet.Alertation;
//import org.apache.log4j.Logger;
public class SendHttp {
// private Logger puppetLogger = Logger.getLogger("myLogger");
public void sendHttp(Alertation alertation) {
HttpPostUtils httppost=new HttpPostUtils();
// StringBuffer pars_str=new StringBuffer();
String[] catip={"10.1.110.23","10.1.6.102","10.1.6.128 "};
String[] pars= new String[11];
pars[0]="type="+alertation.getType();
pars[1]="title="+alertation.getTitle();
pars[2]="domain="+alertation.getDomain();
pars[3]="ip="+alertation.getIp();
pars[4]="user="+alertation.getUser();
pars[5]="content="+alertation.getContent();
pars[6]="url="+alertation.getUrl();
pars[7]="op="+alertation.getOp();
pars[8]="alterationDate="+alertation.getDate();
pars[9]="hostname="+alertation.getHostname();
pars[10]="group="+alertation.getGroup();
// for(int i=0;i<pars.length;i++){
// pars_str.append(pars[i]);
// pars_str.append(";");
// }
// puppetLogger.info(pars_str);
for(String ip:catip){
String url="http://"+ip+":8080/cat/r/alteration";
httppost.setUrlAddress(url);
if(httppost.httpPost(pars)){
break;
}else{
continue;
}
}
}
}
package com.dianping.cat.agent.monitor.puppet.util;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import org.apache.log4j.Logger;
public class SetReaderPostion {
private Logger puppetLogger = Logger.getLogger("myLogger");
public void setReaderPostion(String line_file,Long end_position) {
BufferedWriter output = null ;
try{
output = new BufferedWriter(new FileWriter(line_file));
output.write(Long.toString(end_position));
puppetLogger.info("写入Succ:"+line_file+" position:"+end_position);
output.close();
}catch(IOException e){
puppetLogger.error("写入文件异:"+line_file);
puppetLogger.error(e.getMessage(),e);
}
}
}
...@@ -5,9 +5,9 @@ ...@@ -5,9 +5,9 @@
<implementation>com.dianping.cat.agent.monitor.EnvironmentConfig</implementation> <implementation>com.dianping.cat.agent.monitor.EnvironmentConfig</implementation>
</component> </component>
<component> <component>
<role>com.dianping.cat.agent.monitor.Executor</role> <role>com.dianping.cat.agent.monitor.executors.Executor</role>
<role-hint>JVMMemoryExecutor</role-hint> <role-hint>JVMMemoryExecutor</role-hint>
<implementation>com.dianping.cat.agent.monitor.jvm.JVMMemoryExecutor</implementation> <implementation>com.dianping.cat.agent.monitor.executors.jvm.JVMMemoryExecutor</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.EnvironmentConfig</role> <role>com.dianping.cat.agent.monitor.EnvironmentConfig</role>
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
</requirements> </requirements>
</component> </component>
<component> <component>
<role>com.dianping.cat.agent.monitor.Executor</role> <role>com.dianping.cat.agent.monitor.executors.Executor</role>
<role-hint>JVMStateExecutor</role-hint> <role-hint>JVMStateExecutor</role-hint>
<implementation>com.dianping.cat.agent.monitor.jvm.JVMStateExecutor</implementation> <implementation>com.dianping.cat.agent.monitor.executors.jvm.JVMStateExecutor</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.EnvironmentConfig</role> <role>com.dianping.cat.agent.monitor.EnvironmentConfig</role>
...@@ -25,9 +25,9 @@ ...@@ -25,9 +25,9 @@
</requirements> </requirements>
</component> </component>
<component> <component>
<role>com.dianping.cat.agent.monitor.Executor</role> <role>com.dianping.cat.agent.monitor.executors.Executor</role>
<role-hint>PerformanceExecutor</role-hint> <role-hint>PerformanceExecutor</role-hint>
<implementation>com.dianping.cat.agent.monitor.system.SystemPerformanceExecutor</implementation> <implementation>com.dianping.cat.agent.monitor.executors.system.SystemPerformanceExecutor</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.EnvironmentConfig</role> <role>com.dianping.cat.agent.monitor.EnvironmentConfig</role>
...@@ -35,9 +35,9 @@ ...@@ -35,9 +35,9 @@
</requirements> </requirements>
</component> </component>
<component> <component>
<role>com.dianping.cat.agent.monitor.Executor</role> <role>com.dianping.cat.agent.monitor.executors.Executor</role>
<role-hint>SystemStateExecutor</role-hint> <role-hint>SystemStateExecutor</role-hint>
<implementation>com.dianping.cat.agent.monitor.system.SystemStateExecutor</implementation> <implementation>com.dianping.cat.agent.monitor.executors.system.SystemStateExecutor</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.EnvironmentConfig</role> <role>com.dianping.cat.agent.monitor.EnvironmentConfig</role>
...@@ -45,8 +45,8 @@ ...@@ -45,8 +45,8 @@
</requirements> </requirements>
</component> </component>
<component> <component>
<role>com.dianping.cat.agent.monitor.DataSender</role> <role>com.dianping.cat.agent.monitor.executors.DataSender</role>
<implementation>com.dianping.cat.agent.monitor.DataSender</implementation> <implementation>com.dianping.cat.agent.monitor.executors.DataSender</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.EnvironmentConfig</role> <role>com.dianping.cat.agent.monitor.EnvironmentConfig</role>
...@@ -54,11 +54,20 @@ ...@@ -54,11 +54,20 @@
</requirements> </requirements>
</component> </component>
<component> <component>
<role>com.dianping.cat.agent.monitor.TaskExecutors</role> <role>com.dianping.cat.agent.monitor.executors.TaskExecutors</role>
<implementation>com.dianping.cat.agent.monitor.TaskExecutors</implementation> <implementation>com.dianping.cat.agent.monitor.executors.TaskExecutors</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.DataSender</role> <role>com.dianping.cat.agent.monitor.executors.DataSender</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.agent.monitor.puppet.PuppetTask</role>
<implementation>com.dianping.cat.agent.monitor.puppet.PuppetTask</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.agent.monitor.EnvironmentConfig</role>
</requirement> </requirement>
</requirements> </requirements>
</component> </component>
...@@ -83,10 +92,13 @@ ...@@ -83,10 +92,13 @@
<role>com.dianping.cat.agent.core.page.index.JspViewer</role> <role>com.dianping.cat.agent.core.page.index.JspViewer</role>
</requirement> </requirement>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.DataSender</role> <role>com.dianping.cat.agent.monitor.executors.DataSender</role>
</requirement>
<requirement>
<role>com.dianping.cat.agent.monitor.executors.TaskExecutors</role>
</requirement> </requirement>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.TaskExecutors</role> <role>com.dianping.cat.agent.monitor.puppet.PuppetTask</role>
</requirement> </requirement>
</requirements> </requirements>
</component> </component>
...@@ -100,8 +112,8 @@ ...@@ -100,8 +112,8 @@
</requirements> </requirements>
</component> </component>
<component> <component>
<role>com.dianping.cat.agent.monitor.DataSender</role> <role>com.dianping.cat.agent.monitor.executors.DataSender</role>
<implementation>com.dianping.cat.agent.monitor.DataSender</implementation> <implementation>com.dianping.cat.agent.monitor.executors.DataSender</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.EnvironmentConfig</role> <role>com.dianping.cat.agent.monitor.EnvironmentConfig</role>
...@@ -113,11 +125,20 @@ ...@@ -113,11 +125,20 @@
<implementation>com.dianping.cat.agent.monitor.EnvironmentConfig</implementation> <implementation>com.dianping.cat.agent.monitor.EnvironmentConfig</implementation>
</component> </component>
<component> <component>
<role>com.dianping.cat.agent.monitor.TaskExecutors</role> <role>com.dianping.cat.agent.monitor.executors.TaskExecutors</role>
<implementation>com.dianping.cat.agent.monitor.TaskExecutors</implementation> <implementation>com.dianping.cat.agent.monitor.executors.TaskExecutors</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.agent.monitor.executors.DataSender</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.agent.monitor.puppet.PuppetTask</role>
<implementation>com.dianping.cat.agent.monitor.puppet.PuppetTask</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.agent.monitor.DataSender</role> <role>com.dianping.cat.agent.monitor.EnvironmentConfig</role>
</requirement> </requirement>
</requirements> </requirements>
</component> </component>
......
...@@ -12,142 +12,140 @@ import java.io.BufferedInputStream; ...@@ -12,142 +12,140 @@ import java.io.BufferedInputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.util.regex.*; import java.util.regex.*;
public class PuppetMonitorByLineNo { public class PuppetMonitorByLineNo {
public static void main(String[] args) { public static void main(String[] args) {
String log_file="/Users/River/messages"; String log_file = "/var/log/messages";
String line_file="/var/log/line.log"; String line_file = "/var/log/currentPointer";
File line_f = new File(line_file); File line_f = new File(line_file);
while(true){ while (true) {
int line_no_start=get_startline_no(line_file,log_file); int line_no_start = get_startline_no(line_file, log_file);
int line_reading_count=0; int line_reading_count = 0;
String regEx=".*puppet-agent.*\\(\\/Stage"; String regEx = ".*puppet-agent.*\\(\\/Stage";
System.out.println("line_no_start="+Integer.toString(line_no_start)); System.out.println("line_no_start=" + Integer.toString(line_no_start));
try { try {
FileReader reader = new FileReader(log_file); FileReader reader = new FileReader(log_file);
BufferedReader br = new BufferedReader(reader); BufferedReader br = new BufferedReader(reader);
String str = null; String str = null;
while((str = br.readLine()) != null) { while ((str = br.readLine()) != null) {
line_reading_count++; line_reading_count++;
if(line_reading_count<line_no_start){ if (line_reading_count < line_no_start) {
continue; continue;
}else{ } else {
Pattern pattern1 = Pattern.compile(".*?\\[(.*?)\\].*?"); Pattern pattern1 = Pattern.compile(".*?\\[(.*?)\\].*?");
if(Pattern.compile(regEx).matcher(str).find()){ if (Pattern.compile(regEx).matcher(str).find()) {
// System.out.println(str+"/n"); // System.out.println(str+"/n");
String[] tmp_list=str.split(" "); String[] tmp_list = str.split(" ");
String action_time=tmp_list[0]+tmp_list[1]+" "+tmp_list[2]+" "+tmp_list[3]; String action_time = tmp_list[0] + tmp_list[1] + " " + tmp_list[2] + " " + tmp_list[3];
String hostname=tmp_list[4]; String hostname = tmp_list[4];
String change=str.split("\\(")[1]; String change = str.split("\\(")[1];
String file=change.split("\\)")[0].split("Stage\\[main\\]")[1]; String file = change.split("\\)")[0].split("Stage\\[main\\]")[1];
Matcher matcher1 = pattern1.matcher(file); Matcher matcher1 = pattern1.matcher(file);
if (matcher1.matches()) { if (matcher1.matches()) {
System.out.println(matcher1.group(1)); System.out.println(matcher1.group(1));
} }
System.out.println(action_time +" "+hostname+" "+change+"/n"); System.out.println(action_time + " " + hostname + " " + change + "/n");
} }
else }
continue; }
br.close();
} reader.close();
// System.out.println(str+"/n"); System.out.println("本次读取到文件行数:" + line_reading_count);
} // 将上次读取的行,写入文件
br.close();
reader.close();
// System.out.println(countFileLine("/Users/River/messages"));
System.out.println("本次读取到文件行数:"+line_reading_count);
//将上次读取的行,写入文件
BufferedWriter output = new BufferedWriter(new FileWriter(line_f)); BufferedWriter output = new BufferedWriter(new FileWriter(line_f));
output.write(Integer.toString(line_reading_count)); output.write(Integer.toString(line_reading_count));
output.close(); output.close();
}catch(FileNotFoundException e) { } catch (FileNotFoundException e) {
e.printStackTrace(); e.printStackTrace();
}catch(IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
//slee 5s // slee 5s
try { try {
Thread.sleep(5000); Thread.sleep(5000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
/** /**
* *
* @param line_file 记录上次读取文件的行数 * @param line_file
* @param log_file 读取文件,用于分析内容,此处用来统计文件总行数 * 记录上次读取文件的行数
* @param log_file
* 读取文件,用于分析内容,此处用来统计文件总行数
* @return 返回上次读取的文件的行数 * @return 返回上次读取的文件的行数
*/ */
public static int get_startline_no(String line_file,String log_file){ public static int get_startline_no(String line_file, String log_file) {
int line_no_start = 0; int line_no_start = 0;
String str=""; String str = "";
try{ try {
FileReader line_reader = new FileReader(line_file); FileReader line_reader = new FileReader(line_file);
BufferedReader line_br = new BufferedReader(line_reader); BufferedReader line_br = new BufferedReader(line_reader);
try { try {
if( (str=line_br.readLine())!= null){ if ((str = line_br.readLine()) != null) {
line_no_start=Integer.parseInt(str); line_no_start = Integer.parseInt(str);
line_br.close(); line_br.close();
line_reader.close(); line_reader.close();
}else{//file is empty } else {// file is empty
line_no_start=countFileLine(log_file)/2; line_no_start = countFileLine(log_file) / 2;
} }
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
e.printStackTrace(); e.printStackTrace();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
}catch(FileNotFoundException e){ } catch (FileNotFoundException e) {
line_no_start=countFileLine(log_file)/2; line_no_start = countFileLine(log_file) / 2;
File f = new File(line_file); File f = new File(line_file);
try { try {
if(f.createNewFile()){ if (f.createNewFile()) {
BufferedWriter output = new BufferedWriter(new FileWriter(f)); BufferedWriter output = new BufferedWriter(new FileWriter(f));
output.write(Integer.toString(line_no_start)+"\n"); output.write(Integer.toString(line_no_start) + "\n");
output.close(); output.close();
}else{ } else {
System.out.println("fail to create file : "+line_file); System.out.println("fail to create file : " + line_file);
} }
} catch (IOException e1) { } catch (IOException e1) {
e1.printStackTrace(); e1.printStackTrace();
} }
} }
System.out.println("上次读取的文件行数"+Integer.toString(line_no_start)); System.out.println("上次读取的文件行数" + Integer.toString(line_no_start));
return line_no_start; return line_no_start;
} }
/** /**
* *
* @param file 文件 * @param file
* 文件
* @return 文件的总行数 * @return 文件的总行数
*/ */
public static int countFileLine(String file){ public static int countFileLine(String file) {
int count=0; int count = 0;
try{ try {
InputStream is = new BufferedInputStream(new FileInputStream(file)); InputStream is = new BufferedInputStream(new FileInputStream(file));
byte[] c = new byte[65536]; byte[] c = new byte[65536];
int readChars = 0; int readChars = 0;
while ((readChars = is.read(c)) != -1) { while ((readChars = is.read(c)) != -1) {
for (int i = 0; i < readChars; ++i) { for (int i = 0; i < readChars; ++i) {
if (c[i] == '\n') if (c[i] == '\n')
++count; ++count;
} }
} }
is.close(); is.close();
}catch(IOException e){ } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("文件总行数"+Integer.toString(count)); System.out.println("文件总行数" + Integer.toString(count));
return count; return count;
} }
......
...@@ -35,6 +35,7 @@ public class MetricGraphCreator extends AbstractGraphCreator { ...@@ -35,6 +35,7 @@ public class MetricGraphCreator extends AbstractGraphCreator {
LineChart lineChart = new LineChart(); LineChart lineChart = new LineChart();
buildLineChartTitle(alertKeys, lineChart, key); buildLineChartTitle(alertKeys, lineChart, key);
lineChart.setUnit("Value/分钟");
lineChart.setStart(startDate); lineChart.setStart(startDate);
lineChart.setSize(value.length); lineChart.setSize(value.length);
lineChart.setStep(step * TimeUtil.ONE_MINUTE); lineChart.setStep(step * TimeUtil.ONE_MINUTE);
......
...@@ -55,7 +55,7 @@ public class NetworkGraphCreator extends AbstractGraphCreator { ...@@ -55,7 +55,7 @@ public class NetworkGraphCreator extends AbstractGraphCreator {
Map<Long, Double> convertedData = new LinkedHashMap<Long, Double>(); Map<Long, Double> convertedData = new LinkedHashMap<Long, Double>();
for (Entry<Long, Double> currentEntry : current.entrySet()) { for (Entry<Long, Double> currentEntry : current.entrySet()) {
double result = currentEntry.getValue() / 1000.0; double result = currentEntry.getValue() / 1000000.0 / 60;
convertedData.put(currentEntry.getKey(), result); convertedData.put(currentEntry.getKey(), result);
} }
...@@ -184,7 +184,7 @@ public class NetworkGraphCreator extends AbstractGraphCreator { ...@@ -184,7 +184,7 @@ public class NetworkGraphCreator extends AbstractGraphCreator {
private String buildUnit(String chartTitle) { private String buildUnit(String chartTitle) {
if (isFlowMetric(chartTitle)) { if (isFlowMetric(chartTitle)) {
return "流量(MB/分钟)"; return "流量(MB/)";
} else { } else {
return "value/分钟"; return "value/分钟";
} }
......
...@@ -88,6 +88,7 @@ public class SystemGraphCreator extends AbstractGraphCreator { ...@@ -88,6 +88,7 @@ public class SystemGraphCreator extends AbstractGraphCreator {
lineChart.setHtmlTitle(chartTitle); lineChart.setHtmlTitle(chartTitle);
lineChart.setId(chartTitle); lineChart.setId(chartTitle);
lineChart.setStart(startDate); lineChart.setStart(startDate);
lineChart.setUnit("value/分钟");
lineChart.setStep(step * TimeUtil.ONE_MINUTE); lineChart.setStep(step * TimeUtil.ONE_MINUTE);
if (keyMapEntry.getValue().entrySet().isEmpty()) { if (keyMapEntry.getValue().entrySet().isEmpty()) {
...@@ -135,8 +136,8 @@ public class SystemGraphCreator extends AbstractGraphCreator { ...@@ -135,8 +136,8 @@ public class SystemGraphCreator extends AbstractGraphCreator {
"/var-usage:avg", "eth0-in-flow:sum", "eth0-out-flow:sum", "swap:avg", "load:avg", "uptime:avg", "/var-usage:avg", "eth0-in-flow:sum", "eth0-out-flow:sum", "swap:avg", "load:avg", "uptime:avg",
"md5Change:avg", "hostNameChange:avg", "hostIpChange:avg")); "md5Change:avg", "hostNameChange:avg", "hostIpChange:avg"));
List<String> jvmKeys = new ArrayList<String>(Arrays.asList("edenUsage:avg", "oldUsage:avg", List<String> jvmKeys = new ArrayList<String>(Arrays.asList("edenUsage:avg", "oldUsage:avg", "permUsage:avg",
"permUsage:avg", "tomcatLive:avg", "catalinaLogSize:sum")); "tomcatLive:avg", "catalinaLogSize:sum"));
List<String> nginxKeys = new ArrayList<String>(); List<String> nginxKeys = new ArrayList<String>();
...@@ -147,7 +148,7 @@ public class SystemGraphCreator extends AbstractGraphCreator { ...@@ -147,7 +148,7 @@ public class SystemGraphCreator extends AbstractGraphCreator {
} else if (NGINX_TYPE.equalsIgnoreCase(type)) { } else if (NGINX_TYPE.equalsIgnoreCase(type)) {
return nginxKeys; return nginxKeys;
} else { } else {
return null; return new ArrayList<String>();
} }
} }
...@@ -155,23 +156,21 @@ public class SystemGraphCreator extends AbstractGraphCreator { ...@@ -155,23 +156,21 @@ public class SystemGraphCreator extends AbstractGraphCreator {
List<String> systemKeys = fetchSystemKeys(type); List<String> systemKeys = fetchSystemKeys(type);
Map<String, Map<String, String>> aggregationKeys = new LinkedHashMap<String, Map<String, String>>(); Map<String, Map<String, String>> aggregationKeys = new LinkedHashMap<String, Map<String, String>>();
if (systemKeys != null) { for (String key : systemKeys) {
for (String key : systemKeys) { int typeIndex = key.lastIndexOf(":");
int typeIndex = key.lastIndexOf(":"); String realKey = key.substring(0, typeIndex);
String realKey = key.substring(0, typeIndex); String metricType = key.substring(typeIndex + 1);
String metricType = key.substring(typeIndex + 1); String des = queryMetricItemDes(metricType.toUpperCase());
String des = queryMetricItemDes(metricType.toUpperCase()); String chartKey = realKey + des;
String chartKey = realKey + des; Map<String, String> ipMap = aggregationKeys.get(chartKey);
Map<String, String> ipMap = aggregationKeys.get(chartKey);
if (ipMap == null) { if (ipMap == null) {
ipMap = new HashMap<String, String>(); ipMap = new HashMap<String, String>();
aggregationKeys.put(chartKey, ipMap); aggregationKeys.put(chartKey, ipMap);
} }
for (String ip : ipAddrs) { for (String ip : ipAddrs) {
ipMap.put(ip, realKey + "_" + ip + ":" + metricType.toUpperCase()); ipMap.put(ip, realKey + "_" + ip + ":" + metricType.toUpperCase());
}
} }
} }
return aggregationKeys; return aggregationKeys;
......
...@@ -210,6 +210,7 @@ public class ExceptionAlert implements Task, LogEnabled { ...@@ -210,6 +210,7 @@ public class ExceptionAlert implements Task, LogEnabled {
} }
boolean active = true; boolean active = true;
while (active) { while (active) {
long current = System.currentTimeMillis();
int minute = Calendar.getInstance().get(Calendar.MINUTE); int minute = Calendar.getInstance().get(Calendar.MINUTE);
String minuteStr = String.valueOf(minute); String minuteStr = String.valueOf(minute);
...@@ -217,12 +218,15 @@ public class ExceptionAlert implements Task, LogEnabled { ...@@ -217,12 +218,15 @@ public class ExceptionAlert implements Task, LogEnabled {
minuteStr = '0' + minuteStr; minuteStr = '0' + minuteStr;
} }
Transaction t = Cat.newTransaction("ExceptionAlert", "M" + minuteStr); Transaction t = Cat.newTransaction("ExceptionAlert", "M" + minuteStr);
long current = System.currentTimeMillis();
try { try {
TopMetric topMetric = buildTopMetric(new Date(current - TimeUtil.ONE_MINUTE * 2)); TopMetric topMetric = buildTopMetric(new Date(current - TimeUtil.ONE_MINUTE * 2));
Collection<List<Item>> items = topMetric.getError().getResult().values(); Collection<List<Item>> items = topMetric.getError().getResult().values();
List<Item> item = items.iterator().next(); List<Item> item = new ArrayList<Item>();
if (!items.isEmpty()) {
item = items.iterator().next();
}
Map<String, List<AlertException>> alertExceptions = buildAlertExceptions(item); Map<String, List<AlertException>> alertExceptions = buildAlertExceptions(item);
for (Entry<String, List<AlertException>> entry : alertExceptions.entrySet()) { for (Entry<String, List<AlertException>> entry : alertExceptions.entrySet()) {
...@@ -242,7 +246,7 @@ public class ExceptionAlert implements Task, LogEnabled { ...@@ -242,7 +246,7 @@ public class ExceptionAlert implements Task, LogEnabled {
try { try {
if (duration < DURATION) { if (duration < DURATION) {
Thread.sleep(TimeUtil.ONE_MINUTE); Thread.sleep(DURATION - duration);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
active = false; active = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册