提交 277b53f9 编写于 作者: L leon.li

resolve the confict

......@@ -43,9 +43,9 @@ com.dianping.cat.report.page.externalError.Handler.class,
com.dianping.cat.report.page.statistics.Handler.class,
com.dianping.cat.report.page.systemMonitor.Handler.class,
com.dianping.cat.report.page.alteration.Handler.class,
com.dianping.cat.report.page.alteration.Handler.class
com.dianping.cat.report.page.monitor.Handler.class
})
public class ReportModule extends AbstractModule {
......
......@@ -41,7 +41,7 @@ public enum ReportPage implements Page {
STATISTICS("statistics", "statistics", "Statistics", "Statistics", true),
SYSTEMMONITOR("systemMonitor", "systemMonitor", "SystemMonitor", "SystemMonitor", true),
MONITOR("monitor", "monitor", "Monitor", "Monitor", true),
ALTERATION("alteration", "alteration", "Alteration", "Alteration", true);
......
package com.dianping.cat.report.page.systemMonitor;
package com.dianping.cat.report.page.monitor;
public enum Action implements org.unidal.web.mvc.Action {
SYSTEM_API("systemApi");
COUNT_API("count"),//default action
AVG_API("avg"),
SUM_API("sum");
private String m_name;
......
package com.dianping.cat.report.page.systemMonitor;
package com.dianping.cat.report.page.monitor;
import com.dianping.cat.report.ReportContext;
......
package com.dianping.cat.report.page.systemMonitor;
package com.dianping.cat.report.page.monitor;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.servlet.ServletException;
import org.unidal.lookup.annotation.Inject;
import org.unidal.web.mvc.PageHandler;
import org.unidal.web.mvc.annotation.InboundActionMeta;
import org.unidal.web.mvc.annotation.OutboundActionMeta;
import org.unidal.web.mvc.annotation.PayloadMeta;
import com.dianping.cat.Cat;
import com.dianping.cat.helper.TimeUtil;
import com.dianping.cat.message.Message;
......@@ -13,89 +21,111 @@ import com.dianping.cat.message.internal.DefaultMetric;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.report.ReportPage;
import org.unidal.lookup.annotation.Inject;
import org.unidal.web.mvc.PageHandler;
import org.unidal.web.mvc.annotation.InboundActionMeta;
import org.unidal.web.mvc.annotation.OutboundActionMeta;
import org.unidal.web.mvc.annotation.PayloadMeta;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.site.lookup.util.StringUtils;
public class Handler implements PageHandler<Context> {
@Inject
private JspViewer m_jspViewer;
private Gson m_gson = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE).create();
@Override
@PayloadMeta(Payload.class)
@InboundActionMeta(name = "systemMonitor")
@InboundActionMeta(name = "monitor")
public void handleInbound(Context ctx) throws ServletException, IOException {
// display only, no action here
}
@Override
@OutboundActionMeta(name = "systemMonitor")
@OutboundActionMeta(name = "monitor")
public void handleOutbound(Context ctx) throws ServletException, IOException {
Model model = new Model(ctx);
Payload payload = ctx.getPayload();
Action action = payload.getAction();
HttpStatus status = checkPars(payload);
System.out.println(System.currentTimeMillis());
switch (action) {
case SYSTEM_API:
model.setPage(ReportPage.MONITOR);
model.setStatus(m_gson.toJson(status));
if (status.getStatusCode().equals(String.valueOf(HttpStatus.SUCCESS))) {
String domain = payload.getDomain();
String group = payload.getGroup();
String type = payload.getType();
String key = payload.getKey();
long time = payload.getTimestamp();
int count = payload.getCount();
boolean invalid = time < TimeUtil.getCurrentHour().getTime();
if (invalid) {
Cat.logError(new RuntimeException("Error timestamp in metric api, time"
+ new SimpleDateFormat("yyyy-MM-dd HH:ss").format(new Date(time)) + payload.toString()));
if (time < TimeUtil.getCurrentHour().getTime()) {
time = System.currentTimeMillis();
}
DefaultMetric defaultMetric = null;
int count = payload.getCount();
Transaction t = Cat.newTransaction("test", "test");
if (count == 0) {
count = 1;
Metric metric = Cat.getProducer().newMetric(group, key);
DefaultMetric defaultMetric = (DefaultMetric) metric;
if (defaultMetric != null) {
defaultMetric.setTimestamp(time);
}
if ("count".equalsIgnoreCase(type)) {
Metric metric = Cat.getProducer().newMetric(group, key);
defaultMetric = (DefaultMetric) metric;
switch (action) {
case COUNT_API:
defaultMetric.setStatus("C");
defaultMetric.addData(String.valueOf(count));
} else if ("avg".equalsIgnoreCase(type)) {
Metric metric = Cat.getProducer().newMetric(group, key);
defaultMetric = (DefaultMetric) metric;
break;
case AVG_API:
defaultMetric.setStatus("T");
defaultMetric.addData(String.format("%.2f", payload.getAvg()));
} else if ("sum".equalsIgnoreCase(type)) {
Metric metric = Cat.getProducer().newMetric(group, key);
defaultMetric = (DefaultMetric) metric;
break;
case SUM_API:
defaultMetric.setStatus("S,C");
defaultMetric.addData(String.format("%.2f,%s", payload.getSum(), count));
}
if (defaultMetric != null) {
defaultMetric.setTimestamp(time);
defaultMetric.addData(String.format("%s,%.2f", count, payload.getSum()));
break;
default:
throw new RuntimeException("Unknown action: " + action);
}
t.complete();
DefaultMessageTree tree = (DefaultMessageTree) Cat.getManager().getThreadLocalMessageTree();
tree.setDomain(domain);
Message message = tree.getMessage();
Message message = tree.getMessage();
if (message instanceof Transaction) {
((DefaultTransaction) message).setTimestamp(time);
}
System.out.println(tree);
break;
}
model.setAction(action);
model.setPage(ReportPage.SYSTEMMONITOR);
if (!ctx.isProcessStopped()) {
m_jspViewer.view(ctx, model);
model.setPage(ReportPage.MONITOR);
m_jspViewer.view(ctx, model);
}
public HttpStatus checkPars(Payload payload) {
StringBuilder sb = new StringBuilder();
String domain = payload.getDomain();
String group = payload.getGroup();
String key = payload.getKey();
HttpStatus httpStatus = new HttpStatus();
boolean error = false;
if (StringUtils.isEmpty(domain)) {
sb.append("domain ");
error = true;
}
if (StringUtils.isEmpty(group)) {
sb.append("group ");
error = true;
}
if (StringUtils.isEmpty(key)) {
sb.append("key ");
error = true;
}
if (error) {
httpStatus.setErrorMsg("invalid field: " + sb.toString());
httpStatus.setStatusCode(String.valueOf(HttpStatus.FAIL));
} else {
httpStatus.setStatusCode(String.valueOf(HttpStatus.SUCCESS));
}
return httpStatus;
}
}
package com.dianping.cat.report.page.monitor;
import com.google.gson.annotations.SerializedName;
public class HttpStatus {
public static final int SUCCESS = 0;
public static final int FAIL = -1;
@SerializedName("statusCode")
private String m_statusCode;
@SerializedName("errorMsg")
private String m_errorMsg;
public String getStatusCode() {
return m_statusCode;
}
public void setStatusCode(String statusCode) {
m_statusCode = statusCode;
}
public String getErrorMsg() {
return m_errorMsg;
}
public void setErrorMsg(String errorMsg) {
m_errorMsg = errorMsg;
}
}
package com.dianping.cat.report.page.systemMonitor;
package com.dianping.cat.report.page.monitor;
public enum JspFile {
VIEW("/jsp/report/systemMonitor.jsp"),
VIEW("/jsp/report/monitor.jsp"),
;
......
package com.dianping.cat.report.page.systemMonitor;
package com.dianping.cat.report.page.monitor;
import com.dianping.cat.report.ReportPage;
import org.unidal.web.mvc.view.BaseJspViewer;
......@@ -9,8 +9,12 @@ public class JspViewer extends BaseJspViewer<ReportPage, Action, Context, Model>
Action action = model.getAction();
switch (action) {
case SYSTEM_API:
case COUNT_API:
case AVG_API:
case SUM_API:
return JspFile.VIEW.getPath();
default:
break;
}
throw new RuntimeException("Unknown action: " + action);
......
package com.dianping.cat.report.page.systemMonitor;
package com.dianping.cat.report.page.monitor;
import com.dianping.cat.report.ReportPage;
import org.unidal.web.mvc.ViewModel;
public class Model extends ViewModel<ReportPage, Action, Context> {
private String m_status;
public String getStatus() {
return m_status;
}
public void setStatus(String status) {
m_status = status;
}
public Model(Context ctx) {
super(ctx);
}
@Override
public Action getDefaultAction() {
return Action.SYSTEM_API;
return Action.COUNT_API;
}
}
package com.dianping.cat.report.page.systemMonitor;
package com.dianping.cat.report.page.monitor;
import com.dianping.cat.report.ReportPage;
import com.dianping.cat.report.page.AbstractReportPayload;
import org.unidal.web.mvc.ActionContext;
import org.unidal.web.mvc.ActionPayload;
import org.unidal.web.mvc.payload.annotation.FieldMeta;
public class Payload implements ActionPayload<ReportPage, Action> {
private ReportPage m_page;
@FieldMeta("op")
private Action m_action;
@FieldMeta("group")
private String m_group;
@FieldMeta("key")
private String m_key;
@FieldMeta("type")
private String m_type;
@FieldMeta("domain")
private String m_domain;
@FieldMeta("timestamp")
private long m_timestamp;
@FieldMeta("count")
private int m_count;
@FieldMeta("avg")
private double m_avg;
@FieldMeta("sum")
private double m_sum;
public String getDomain() {
return m_domain;
}
public class Payload extends AbstractReportPayload<Action> {
@FieldMeta("op")
private Action m_action;
@FieldMeta("group")
private String m_group;
@FieldMeta("key")
private String m_key;
@FieldMeta("type")
private String m_type;
@FieldMeta("domain")
private String m_domain;
@FieldMeta("timestamp")
private long m_timestamp;
@FieldMeta("count")
private int m_count = 1;
@FieldMeta("avg")
private double m_avg;
@FieldMeta("sum")
private double m_sum;
public Payload() {
super(ReportPage.MONITOR);
}
public String getDomain() {
return m_domain;
}
public void setDomain(String domain) {
m_domain = domain;
}
m_domain = domain;
}
public String getGroup() {
return m_group;
}
return m_group;
}
public void setGroup(String group) {
m_group = group;
}
m_group = group;
}
public int getCount() {
return m_count;
}
return m_count;
}
public void setCount(int count) {
m_count = count;
}
m_count = count;
}
public double getAvg() {
return m_avg;
}
return m_avg;
}
public void setAvg(double avg) {
m_avg = avg;
}
m_avg = avg;
}
public double getSum() {
return m_sum;
}
return m_sum;
}
public void setSum(double sum) {
m_sum = sum;
}
m_sum = sum;
}
public String getKey() {
return m_key;
}
return m_key;
}
public void setKey(String key) {
m_key = key;
}
m_key = key;
}
public String getType() {
return m_type;
}
return m_type;
}
public void setType(String type) {
m_type = type;
}
m_type = type;
}
public void setAction(String action) {
m_action = Action.getByName(action, Action.SYSTEM_API);
}
@Override
public Action getAction() {
return m_action;
}
m_action = Action.getByName(action, Action.COUNT_API);
}
@Override
public ReportPage getPage() {
return m_page;
}
@Override
public Action getAction() {
return m_action;
}
@Override
public void setPage(String page) {
m_page = ReportPage.getByName(page, ReportPage.SYSTEMMONITOR);
}
public long getTimestamp() {
return m_timestamp;
}
public long getTimestamp() {
return m_timestamp;
}
public void setTimestamp(long timestamp) {
m_timestamp = timestamp;
}
m_timestamp = timestamp;
}
@Override
public void validate(ActionContext<?> ctx) {
if (m_action == null) {
m_action = Action.COUNT_API;
}
}
@Override
public void validate(ActionContext<?> ctx) {
if (m_action == null) {
m_action = Action.SYSTEM_API;
}
public String toString() {
return "Payload [m_group=" + m_group + ", m_key=" + m_key + ", m_type=" + m_type + ", m_domain=" + m_domain
+ ", m_timestamp=" + m_timestamp + "]";
}
}
......@@ -2873,17 +2873,17 @@
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.systemMonitor.Handler</role>
<implementation>com.dianping.cat.report.page.systemMonitor.Handler</implementation>
<role>com.dianping.cat.report.page.monitor.Handler</role>
<implementation>com.dianping.cat.report.page.monitor.Handler</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.report.page.systemMonitor.JspViewer</role>
<role>com.dianping.cat.report.page.monitor.JspViewer</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.systemMonitor.JspViewer</role>
<implementation>com.dianping.cat.report.page.systemMonitor.JspViewer</implementation>
<role>com.dianping.cat.report.page.monitor.JspViewer</role>
<implementation>com.dianping.cat.report.page.monitor.JspViewer</implementation>
<requirements>
<requirement>
<role>org.unidal.web.mvc.view.model.ModelHandler</role>
......
......@@ -56,12 +56,12 @@
<page name="statistics" title="Statistics" path="statistics" view="/jsp/report/bug/bug.jsp" template="default">
<description>Statistics</description>
</page>
<page name="systemMonitor" title="SystemMonitor" path="systemMonitor" template="default">
<description>SystemMonitor</description>
</page>
<page name="alteration" title="Alteration" path="alteration" template="default">
<description>Alteration</description>
</page>
<page name="monitor" title="Monitor" path="monitor" template="default">
<description>Monitor</description>
</page>
</module>
<module name="system" path="s" default="false">
<page name="alarm" title="Alarm" default="true" path="alarm" view="/jsp/system/alarm/alarm.jsp">
......
......@@ -15,13 +15,48 @@
<h4 class="text-success">第二步:业务代码埋点</h4>
<h5 class='text-error'> Metric一共有三个API,分别用来记录次数、平均、总和,统一粒度为一分钟</h5>
<h5>1.Java API调用方式</h5>
<p> 1).logMetricForCount用于记录一个指标值出现的次数</p>
<p> 2).logMetricForDuration用于记录一个指标出现的平均值</p>
<p> 3).logMetricForSum用于记录一个指标出现的总和</p>
<p class='text-error'> 4).PayCount记录次数选用logMetricForCount这个API,PayAmount记录总和选用logMetricForSum这个API</p>
<p> 5).集成代码可能是如下所示</p>
<img class="img-polaroid" width='60%' src="${model.webapp}/images/business04.png"/>
<h5>2.HTTP API调用方式</h5>
<p>接口调用请求说明</p>
<pre>
http请求方式: GET(请使用http协议)
http://cat.dianpingoa.com/cat/r/systemMonitor?
</pre>
<p>参数说明</p>
<table style="width:50%" class="table table-striped table-bordered table-condensed">
<tr><th width="30%">参数</th><th>说明</th></tr>
<tr><td>group</td><td>监控组唯一ID名称,<span class="text-error">必需</span></td></tr>
<tr><td>domain</td><td>应用唯一ID名称,<span class="text-error">必需</span></td></tr>
<tr><td>key</td><td>监控业务唯一ID名称,<span class="text-error">必需</span></td></tr>
<tr><td>op</td><td>sum,avg,count[<span class="text-error">默认</span>]</td></tr>
<tr><td>count</td><td>op=count时所需,<span class="text-error">默认为1</span></td></tr>
<tr><td>sum</td><td>op=sum时所需,<span class="text-error">默认为0</span></td></tr>
<tr><td>avg</td><td>op=avg时所需,<span class="text-error">默认为0</span></td></tr>
</table>
<p> 1).op = count时,用于记录一个指标值出现的次数</p>
<pre>
http://cat.dianpingoa.com/cat/r/monitor?group=myGroup&domain=myApp&key=myKey&op=count
</pre>
<p> 2).op = avg时,用于记录一个指标出现的平均值</p>
<pre>
http://cat.dianpingoa.com/cat/r/monitor?group=myGroup&domain=myApp&key=myKey&op=avg&avg=500
</pre>
<p> 3).op = sum时,用于记录一个指标出现的总和</p>
<pre>
http://cat.dianpingoa.com/cat/r/monitor?group=myGroup&domain=myApp&key=myKey&op=sum&sum=500
</pre>
<p>返回说明</p>
<pre>
<span class="text-error">{"statusCode":"-1","errorMsg":"Unknown [ domain,group,key ] name!"} ——> 失败 [必需参数缺失]</span>
<span class="text-success">{"statusCode":"0"} ——> 成功</span>
</pre>
</br>
</br>
<h4 class="text-success">第三步:产品线配置</h4>
......
<%@ page contentType="text/html; charset=utf-8" %>
<jsp:useBean id="ctx" type="com.dianping.cat.report.page.monitor.Context" scope="request"/>
<jsp:useBean id="payload" type="com.dianping.cat.report.page.monitor.Payload" scope="request"/>
<jsp:useBean id="model" type="com.dianping.cat.report.page.monitor.Model" scope="request"/>
${model.status}
\ No newline at end of file
<%@ page contentType="text/html; charset=utf-8" %>
<jsp:useBean id="ctx" type="com.dianping.cat.report.page.systemMonitor.Context" scope="request"/>
<jsp:useBean id="payload" type="com.dianping.cat.report.page.systemMonitor.Payload" scope="request"/>
<jsp:useBean id="model" type="com.dianping.cat.report.page.systemMonitor.Model" scope="request"/>
View of systemMonitor page under report
\ No newline at end of file
#include <stdlib.h>
#include <stdio.h>
#include <pcap.h>
#include <time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <string.h>
#include <netinet/ip.h>
#include <netinet/if_ether.h>
#include <netinet/tcp.h>
#include <pthread.h>
#include <mysql.h>
//1MB
#define QUEUESIZE 1048576
#define DEALTHREADCOUNT 2
#define CITYIDMAX 500
#define CITYNAMEMAXLENGTH 100
#define IPINFOCOUNT 131865
#define MYSQLSERVER "127.0.0.1"
#define MYSQLSERVERPORT 3306
#define MYSQLUSER "root"
#define MYSQLPASS "12qwaszx"
#define MYSQLDB "DP"
#define INNERNETIPSEGSTART "10.1.0.0"
#define INNERNETIPSEGEND "10.8.0.0"
struct Conf
{
char servIP[16];
int servPort;
} g_conf;
struct Queue
{
int r;
int w;
int count;
u_char buf[QUEUESIZE];
pthread_mutex_t mutex;
};
struct ThreadArg
{
struct Queue queue;
} g_threadarg[DEALTHREADCOUNT];
pcap_t *g_phandle;
pthread_mutex_t g_sniffmutex;
pthread_cond_t g_sniffcond;
int g_sniffenable;
unsigned int push_pkt_error;
struct Stat
{
struct City
{
char name[CITYNAMEMAXLENGTH];
long count;
long traffic;
} city[CITYIDMAX];
long all_in_count;
long all_in_traffic;
long all_out_count;
long all_out_traffic;
pthread_mutex_t mutex;
} g_stat;
struct IPINFO
{
struct _IPINFO
{
unsigned int start;
unsigned int end;
short cityid;
} info[IPINFOCOUNT];
int count;
} g_ipinfo;
int init()
{
MYSQL mysql;
MYSQL_RES *res;
MYSQL_ROW row;
mysql_init(&mysql);
if(NULL == mysql_real_connect(&mysql, MYSQLSERVER, MYSQLUSER,
MYSQLPASS,MYSQLDB,MYSQLSERVERPORT,NULL,0))
{
fprintf(stderr,"Couldn't connect to engine!\n%s\n", mysql_error(&mysql));
exit(1);
}
if(0 != mysql_set_character_set(&mysql, "utf8"))
{
fprintf(stderr,"mysql set character set error\n");
exit(1);
}
char *sql = "select DP_CityIP.`StartIPHashValue`, \
DP_CityIP.`EndIPHashValue`, DP_CityIP.`CityID`, \
DP_CityIP.`Location` from DP_CityIP;";
if(0 != mysql_query(&mysql, sql))
{
fprintf(stderr,"Query failed (%s)\n",mysql_error(&mysql));
exit(1);
}
if(NULL == (res=mysql_use_result(&mysql)))
{
fprintf(stderr,"Couldn't get result from \n");
exit(1);
}
while(row = mysql_fetch_row(res))
{
if(g_ipinfo.count == IPINFOCOUNT)
break;
g_ipinfo.info[g_ipinfo.count].start = atoi(row[0]);
g_ipinfo.info[g_ipinfo.count].end = atoi(row[1]);
g_ipinfo.info[g_ipinfo.count].cityid = atoi(row[2]);
if(g_stat.city[g_ipinfo.info[g_ipinfo.count].cityid].name[0] == 0)
{
strcpy(g_stat.city[g_ipinfo.info[g_ipinfo.count].cityid].name, row[3]);
}
g_ipinfo.count++;
}
mysql_free_result(res);
mysql_close(&mysql);
return 0;
}
int find_ipseg(unsigned int ip)
{
int low, high, mid;
low = 0;
high = g_ipinfo.count-1;
while(low <= high)
{
mid = (low+high) / 2;
if(ip >= g_ipinfo.info[mid].start && ip <= g_ipinfo.info[mid].end)
return mid;
else if(ip < g_ipinfo.info[mid].start)
high = mid - 1;
else
low = mid + 1;
}
return -1;
}
int get_pkt(struct Queue* q, u_char* pkt, int* pkt_len)
{
int count;
int r;
int first;
pthread_mutex_lock(&q->mutex);
count = q->count;
r = q->r;
if(count == 0)
{
pthread_mutex_unlock(&q->mutex);
return -1;
}
//count packet length
if(r + 4 <= QUEUESIZE)
{
*pkt_len = *((int*)(q->buf+r));
}
else
{
first = QUEUESIZE - r;
memcpy(pkt_len,q->buf+r,first);
memcpy(((u_char*)(pkt_len))+first,q->buf,4-first);
}
q->count -= *pkt_len + 4;
q->r = (q->r + 4 + *pkt_len) % QUEUESIZE;
pthread_mutex_unlock(&q->mutex);
r = (r + 4) % QUEUESIZE;
if(r + *pkt_len <= QUEUESIZE)
{
memcpy(pkt,q->buf+r,*pkt_len);
}
else
{
first = QUEUESIZE - r;
memcpy(pkt,q->buf+r,first);
memcpy(pkt+first,q->buf,*pkt_len-first);
}
return 0;
}
int put_pkt(struct Queue* q, const u_char* pkt, int pkt_len)
{
int count, w, first;
pthread_mutex_lock(&q->mutex);
count = q->count;
w = q->w;
//+4: 包前面会放一个4字节的整数标识包的长度
if(count + pkt_len + 4 > QUEUESIZE)
{
pthread_mutex_unlock(&q->mutex);
return -1;
}
//修改w
q->w = (q->w + 4 + pkt_len) % QUEUESIZE;
q->count += pkt_len + 4;
if(w + 4 <= QUEUESIZE)
{
memcpy(q->buf+w,&pkt_len,4);
}
else
{
first = QUEUESIZE - w;
memcpy(q->buf+w,&pkt_len,first);
memcpy(q->buf,((u_char*)(&pkt_len))+first,4-first);
}
w = (w + 4) % QUEUESIZE;
if(w + pkt_len <= QUEUESIZE)
{
memcpy(q->buf+w,pkt,pkt_len);
}
else
{
first = QUEUESIZE - w;
memcpy(q->buf+w,pkt,first);
memcpy(q->buf,pkt+first,pkt_len-first);
}
pthread_mutex_unlock(&q->mutex);
return 0;
}
void push_packet(u_char* user,const struct pcap_pkthdr* pkthdr,const u_char* packet)
{
struct iphdr* ip;
struct tcphdr* tcp;
ip = (struct iphdr *)(packet + sizeof(struct ethhdr));
tcp = (struct tcphdr *)(packet + sizeof(struct ethhdr) + ip->ihl * 4);
int porthash = ntohs(tcp->source) % DEALTHREADCOUNT;
if(-1 == put_pkt(&g_threadarg[porthash].queue, packet+sizeof(struct ethhdr),
pkthdr->len-sizeof(struct ethhdr)))
{
push_pkt_error++;
printf("丢包数%d\n", push_pkt_error);
usleep(200);
}
}
void* deal_packet(void* arg)
{
int ip_len;
struct iphdr* ip;
struct tcphdr* tcp;
u_char tcp_flag;
u_char buffer[1600];
memset(buffer, 0, 1600);
int sockfd;
struct ThreadArg* ta = (struct ThreadArg*)arg;
int data_len;
char *find;
unsigned int ip1,ip2,ip3,ip4;
unsigned int xffip;
int pos;
in_addr_t inner_net_ip_start = htonl(inet_addr(INNERNETIPSEGSTART));
in_addr_t inner_net_ip_end = htonl(inet_addr(INNERNETIPSEGEND));
in_addr_t src, dst;
while(1)
{
if(-1 == get_pkt(&ta->queue, buffer, &ip_len))
{
usleep(200);
continue;
}
ip = (struct iphdr *)buffer;
tcp = (struct tcphdr *)(buffer + ip->ihl * 4);
ip_len = ntohs(ip->tot_len); //maybe shorter then pkt len, eth data shortest len is 46
data_len = ip_len - ip->ihl * 4 - tcp->doff * 4;
src = ntohl(ip->saddr);
dst = ntohl(ip->daddr);
if(dst >= inner_net_ip_start && dst <= inner_net_ip_end &&
(src < inner_net_ip_start || src > inner_net_ip_end)) //in
{
pthread_mutex_lock(&g_stat.mutex);
g_stat.all_in_count++;
g_stat.all_in_traffic += ip_len;
pthread_mutex_unlock(&g_stat.mutex);
}
else if(src >= inner_net_ip_start && src <= inner_net_ip_end &&
(dst < inner_net_ip_start || dst > inner_net_ip_end)) //out
{
pthread_mutex_lock(&g_stat.mutex);
g_stat.all_out_count++;
g_stat.all_out_traffic += ip_len;
pthread_mutex_unlock(&g_stat.mutex);
}
else
{
fprintf(stderr, "%s\n", inet_ntoa(addr));
}
}
}
//抓包
void* sniff(void* arg)
{
while(1)
{
pcap_loop(g_phandle,-1,push_packet,NULL);
pthread_mutex_lock(&g_sniffmutex);
if(0 != pthread_cond_wait(&g_sniffcond, &g_sniffmutex))
{
fprintf(stderr, "sniff: cond wait error!\n");
}
pthread_mutex_unlock(&g_sniffmutex);
}
}
void send_stat()
{
long all_in_count, all_in_traffic;
long all_out_count, all_out_traffic;
int sndSock;
struct sockaddr_in serverAddr;
char buf[1024];
int buflen = sizeof(buf) / sizeof(buf[0]);
char content[100];
int contentlen = sizeof(content) / sizeof(content[0]);
int sndlen;
char http_arg[100];
memset(&serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(g_conf.servPort);
serverAddr.sin_addr.s_addr = inet_addr(g_conf.servIP);
memset(http_arg, 0, sizeof(http_arg));
sprintf(http_arg, "Host: %s:%d\n"
"Connection: keep-alive\n"
"Cache-Control: max-age=0\n"
"Content-Type: application/x-www-form-urlencoded\n\n",
g_conf.servIP, g_conf.servPort);
while(1)
{
sleep(2);
pthread_mutex_lock(&g_stat.mutex);
all_in_count = g_stat.all_in_count;
all_in_traffic = g_stat.all_in_traffic;
all_out_count = g_stat.all_out_count;
all_out_traffic = g_stat.all_out_traffic;
g_stat.all_in_count = 0;
g_stat.all_in_traffic = 0;
g_stat.all_out_count = 0;
g_stat.all_out_traffic = 0;
pthread_mutex_unlock(&g_stat.mutex);
printf("%u,%u\n", all_in_count, all_in_traffic);
printf("%u,%u\n", all_out_count, all_out_traffic);
printf("\n");
if((sndSock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
fprintf(stderr, "create send socket failed!\n");
continue;
};
if(connect(sndSock, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0)
{
fprintf(stderr, "connect to server failed!\n");
close(sndSock);
continue;
}
memset(buf, 0, buflen);
memset(content, 0, contentlen);
sprintf(content, "group=TestGroup&domain=net&key=traffic&op=sum&sum=%d", all_in_count);
sprintf(buf, "GET /cat/r/systemMonitor?%s HTTP/1.1\n%s", content, http_arg);
if((sndlen = send(sndSock, buf, strlen(buf), 0)) == -1 )
{
fprintf(stderr, "send data to server failed!\n");
close(sndSock);
continue;
}
close(sndSock);
}
}
int main(int argc, char **argv)
{
char *device = "eth5";
char errbuf[1024];
pthread_t tid[DEALTHREADCOUNT];
pthread_t snifftid;
int i;
bpf_u_int32 ipaddress,ipmask;
struct bpf_program fcode;
printf("initing....\n");
memset(&g_stat, 0, sizeof(g_stat));
memset(&g_ipinfo, 0, sizeof(g_ipinfo));
if(-1 == init())
{
fprintf(stderr, "init cityip error\n");
exit(1);
}
printf("init completed.\n");
memset(&g_threadarg, 0, sizeof(struct ThreadArg)*DEALTHREADCOUNT);
for(i=0; i<DEALTHREADCOUNT; i++)
{
if(pthread_create(&tid[i], NULL, deal_packet, &g_threadarg[i]) != 0)
{
fprintf(stderr, "create packet deal thread %d error\n", i+1);
exit(1);
}
}
g_phandle=pcap_open_live(device,1600,1,100,errbuf);
if(g_phandle==NULL){
perror(errbuf);
exit(1);
}
memset(&g_sniffcond, 0, sizeof(pthread_cond_t));
memset(&g_sniffmutex, 0, sizeof(pthread_mutex_t));
g_sniffenable = 1;
if(pthread_create(&snifftid, NULL, sniff, NULL) != 0)
{
fprintf(stderr, "*** create sniff thread error\n");
exit(1);
}
memset(&g_conf, 0, sizeof(g_conf));
strcpy(g_conf.servIP, "127.0.0.1");
g_conf.servPort = 8888;
send_stat();
return 0;
}
#!/usr/bin/env python
#encoding=utf-8
import subprocess
import time
import urllib2
import logging
from config import *
def init(switchs):
logging.basicConfig(filename = LOGFILE, level = logging.INFO, \
format = '%(asctime)s - %(levelname)s: %(message)s')
i = 0
while i < len(switchs):
if not switchs[i].get('type') or not switchs[i].get('public') or \
not switchs[i].get('ip') or not switchs[i].get('port'):
logging.warning('init: switch %s info is not enough.' % str(switchs[i]))
del switchs[i]
continue
if switchs[i]['type'] not in OID.keys():
logging.warning('init: the switch %s \'s type %s is not support.' % \
(str(switchs[i]), type))
del switchs[i]
continue
if not switchs[i].get('version'):
switchs[i]['version'] = '2c'
if not switchs[i].get('group'):
switchs[i]['group'] = 'switch'
if not switchs[i].get('name'):
switchs[i]['name'] = get_name(switchs[i]['ip'], switchs[i]['type'], \
switchs[i]['public'], switchs[i]['version'])
switchs[i]['portname']= get_port_name(switchs[i]['ip'], switchs[i]['port'], \
switchs[i]['type'], switchs[i]['public'], switchs[i]['version'])
i += 1
logging.info('init: succesed')
def _get_data(server, oid_array, public, version, cmd='snmpget'):
cmd = cmd + ' -c %s -v %s %s' % (public, version, server)
for oid in oid_array:
cmd += ' ' + oid
try:
p = subprocess.Popen(cmd,stdout=subprocess.PIPE,shell=True)
p.wait()
data = [line.split(': ')[1] for line in p.stdout.read().split('\n') if line]
return data
except Exception, e:
logging.warning('_get_data exception: %s' % str(e))
return None
def get_name(server, type, public, version):
oid_array = [OID[type]['name']]
data = _get_data(server, oid_array, public, version, 'snmpwalk')
if not data:
logging.warning('get_name error: cannot get server %s \'s name.' % server)
return server
return data[0]
def get_port_name(server, ports, type, public, version):
oid_array = []
for p in ports:
oid_array.append('%s%d' % (OID[type]['port_name'], p))
data = _get_data(server, oid_array, public, version)
if not data:
logging.warning('get_port_stat: cannot get server %s \'s port statistics.' % server)
return [str(p) for p in ports]
return data
def get_port_stat(server, ports, type, public, version):
oid_array = []
for p in ports:
oid_array.append('%s%d' % (OID[type]['in_traffic'], p))
oid_array.append('%s%d' % (OID[type]['out_traffic'], p))
oid_array.append('%s%d' % (OID[type]['in_pkts'], p))
oid_array.append('%s%d' % (OID[type]['out_pkts'], p))
data = _get_data(server, oid_array, public, version)
if not data:
logging.warning('get_port_stat: cannot get server %s \'s port statistics.' % server)
return None
data = [int(d) for d in data]
return data[::4], data[1::4], data[2::4], data[3::4]
def send_data(group, domain, key, data):
url = 'http://%s/cat/r/systemMonitor?group=%s&domain=%s&key=%s&op=sum&sum=%d' % \
(DATA_RECEIVER, group, domain, key, data)
try:
urllib2.urlopen(url, timeout=0)
except Exception, e:
logging.warning('send_data: %s %s %s %d' % (group, domain, key, data))
return False
return True
def netcat(switchs):
while True:
for sw in switchs:
data = get_port_stat(sw['ip'], sw['port'], sw['type'], sw['public'], sw['version'])
if not data:
logging.warning('netcat: can not catch switch %s \'s data' % str(sw))
continue
last = sw.get('last')
sw['last'] = data
if not last:
continue
indiff = [d + 2**32 if d < 0 else d for d in map(lambda x: x[0] - x[1], zip(data[0], last[0]))]
outdiff = [d + 2**32 if d < 0 else d for d in map(lambda x: x[0] - x[1], zip(data[1], last[1]))]
i = 0
while i < len(sw['port']):
send_data(sw['group'], sw['name'], '-'.join([sw['portname'][i],'in']), indiff[i])
send_data(sw['group'], sw['name'], '-'.join([sw['portname'][i],'out']), outdiff[i])
i += 1
time.sleep(INTERVAL_TIME)
if __name__ == '__main__':
init(switchs)
netcat(switchs)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册