提交 334e6525 编写于 作者: F Frankie Wu

refactor model service implementation

上级 3f198c67
......@@ -153,6 +153,12 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
public ProblemReport getReport(String domain) {
if (domain == null) {
List<String> domains = getDomains();
domain = domains.isEmpty() ? null : domains.get(0);
}
return m_reports.get(domain);
}
......
......@@ -144,6 +144,12 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
public TransactionReport getReport(String domain) {
if (domain == null) {
List<String> domains = getDomains();
domain = domains.isEmpty() ? null : domains.get(0);
}
return m_reports.get(domain);
}
......
......@@ -33,6 +33,10 @@ public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
// override it
}
public R getReport(String domain) {
throw new UnsupportedOperationException("Not implemented yet!");
}
protected abstract List<R> generate();
protected abstract boolean isTimeout();
......
......@@ -8,14 +8,14 @@ import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.report.page.model.ip.CompositeIpService;
import com.dianping.cat.report.page.model.ip.LocalIpService;
import com.dianping.cat.report.page.model.logview.CompositeLogViewService;
import com.dianping.cat.report.page.model.logview.HdfsLogViewService;
import com.dianping.cat.report.page.model.logview.HistoricalLogViewService;
import com.dianping.cat.report.page.model.logview.LocalLogViewService;
import com.dianping.cat.report.page.model.problem.CompositeProblemService;
import com.dianping.cat.report.page.model.problem.HdfsProblemService;
import com.dianping.cat.report.page.model.problem.HistoricalProblemService;
import com.dianping.cat.report.page.model.problem.LocalProblemService;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.report.page.model.transaction.CompositeTransactionService;
import com.dianping.cat.report.page.model.transaction.HdfsTransactionService;
import com.dianping.cat.report.page.model.transaction.HistoricalTransactionService;
import com.dianping.cat.report.page.model.transaction.LocalTransactionService;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.configuration.AbstractResourceConfigurator;
......@@ -25,36 +25,40 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
String remoteServers = property("remote-servers", ""); // no remote server
all.add(C(ModelService.class, "transaction-local", LocalTransactionService.class) //
.req(MessageConsumer.class, "realtime"));
all.add(C(ModelService.class, "transaction-hdfs", HdfsTransactionService.class) //
all.add(C(ModelService.class, "transaction-historical", HistoricalTransactionService.class) //
.req(BucketManager.class));
all.add(C(ModelService.class, "transaction", CompositeTransactionService.class) //
.req(ModelService.class, new String[] { "transaction-local", "transaction-hdfs" }, "m_services"));
.req(ModelService.class, new String[] { "transaction-local", "transaction-historical" }, "m_services") //
.config(E("remoteServers").value(remoteServers)));
all.add(C(ModelService.class, "problem-local", LocalProblemService.class) //
.req(MessageConsumer.class, "realtime"));
all.add(C(ModelService.class, "problem-hdfs", HdfsProblemService.class) //
all.add(C(ModelService.class, "problem-historical", HistoricalProblemService.class) //
.req(BucketManager.class));
all.add(C(ModelService.class, "problem", CompositeProblemService.class) //
.req(ModelService.class, new String[] { "problem-local", "problem-hdfs" }, "m_services"));
.req(ModelService.class, new String[] { "problem-local", "problem-historical" }, "m_services") //
.config(E("remoteServers").value(remoteServers)));
all.add(C(ModelService.class, "ip-local", LocalIpService.class) //
.req(MessageConsumer.class, "realtime"));
.req(MessageConsumer.class, "realtime"));
all.add(C(ModelService.class, "ip", CompositeIpService.class) //
.req(ModelService.class, new String[] { "ip-local" }, "m_services"));
.req(ModelService.class, new String[] { "ip-local" }, "m_services") //
.config(E("remoteServers").value(remoteServers)));
all.add(C(ModelService.class, "logview-local", LocalLogViewService.class) //
.req(MessageConsumer.class, "realtime") //
.req(BucketManager.class) //
.req(MessageCodec.class, "html"));
all.add(C(ModelService.class, "logview-hdfs", HdfsLogViewService.class) //
all.add(C(ModelService.class, "logview-historical", HistoricalLogViewService.class) //
.req(BucketManager.class) //
.req(MessageCodec.class, "html", "m_htmlCodec") //
.req(MessageCodec.class, "plain-text", "m_plainCodec"));
.req(MessageCodec.class, "html"));
all.add(C(ModelService.class, "logview", CompositeLogViewService.class) //
.req(ModelService.class, new String[] { "logview-local", "logview-hdfs" }, "m_services"));
.req(ModelService.class, new String[] { "logview-local", "logview-historical" }, "m_services") //
.config(E("remoteServers").value(remoteServers)));
return all;
}
......
......@@ -91,4 +91,8 @@ public class CompositeIpService implements ModelService<IpReport>, Initializable
public void setSerivces(ModelService<IpReport>... services) {
m_services = Arrays.asList(services);
}
public void setRemoteServers(String servers) {
}
}
package com.dianping.cat.report.page.model.logview;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public class CompositeLogViewService implements ModelService<String>, Initializable {
@Inject
private List<ModelService<String>> m_services = new ArrayList<ModelService<String>>();
import com.dianping.cat.report.page.model.spi.internal.BaseCompositeModelService;
import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService;
private ExecutorService m_threadPool;
@Override
public void initialize() throws InitializationException {
m_threadPool = Executors.newFixedThreadPool(10);
public class CompositeLogViewService extends BaseCompositeModelService<String> {
public CompositeLogViewService() {
super("logview");
}
@Override
public ModelResponse<String> invoke(final ModelRequest request) {
int size = m_services.size();
final List<ModelResponse<String>> responses = new ArrayList<ModelResponse<String>>(size);
final Semaphore semaphore = new Semaphore(0);
int count = 0;
for (final ModelService<String> service : m_services) {
if (service.isEligable(request)) {
m_threadPool.submit(new Runnable() {
@Override
public void run() {
try {
responses.add(service.invoke(request));
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
});
count++;
}
}
try {
semaphore.tryAcquire(count, 5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore it
}
for (ModelResponse<String> response : responses) {
if (response != null && response.getModel() != null) {
return response;
}
}
return new ModelResponse<String>();
protected BaseRemoteModelService<String> createRemoteService() {
return new RemoteLogViewService();
}
@Override
public boolean isEligable(ModelRequest request) {
for (ModelService<String> service : m_services) {
if (service.isEligable(request)) {
return true;
}
}
return false;
}
public void setSerivces(ModelService<String>... services) {
for (ModelService<String> service : services) {
m_services.add(service);
}
}
/**
* Inject remote servers to load transaction model.
* <p>
*
* For example, servers: 192.168.1.1:2281,192.168.1.2,192.168.1.3
*
* @param servers
* server list separated by comma(',')
*/
public void setRemoteServers(String servers) {
List<String> endpoints = Splitters.by(',').split(servers);
String localAddress = null;
String localHost = null;
try {
localAddress = InetAddress.getLocalHost().getHostAddress();
localHost = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
// ignore it
}
for (String endpoint : endpoints) {
int pos = endpoint.indexOf(':');
String host = (pos > 0 ? endpoint.substring(0, pos) : endpoint);
int port = (pos > 0 ? Integer.parseInt(endpoint.substring(pos) + 1) : 2281);
protected String merge(List<ModelResponse<String>> responses) {
for (ModelResponse<String> response : responses) {
if (response != null) {
String model = response.getModel();
if (port == 2281) {
if ("localhost".equals(host) || "127.0.0.1".equals(host)) {
// exclude localhost
continue;
} else if (host.equals(localAddress) || host.equals(localHost)) {
// exclude itself
continue;
if (model != null) {
return model;
}
}
RemoteLogViewService remote = new RemoteLogViewService();
remote.setHost(host);
remote.setPort(port);
m_services.add(remote);
}
return null;
}
}
......@@ -10,62 +10,54 @@ import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HdfsLogViewService implements ModelService<String> {
public class HistoricalLogViewService extends BaseHistoricalModelService<String> {
@Inject
private BucketManager m_bucketManager;
@Inject(value = "html")
private MessageCodec m_codec;
@Override
public ModelResponse<String> invoke(ModelRequest request) {
public HistoricalLogViewService() {
super("logview");
}
protected String buildModel(ModelRequest request) throws Exception {
String messageId = request.getProperty("messageId");
String direction = request.getProperty("direction");
String tag = request.getProperty("tag");
MessageId id = MessageId.parse(messageId);
ModelResponse<String> response = new ModelResponse<String>();
try {
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain(), "remote");
MessageTree tree = null;
Date timestamp = new Date(id.getTimestamp());
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(timestamp, id.getDomain(), "remote");
MessageTree tree = null;
if (tag != null && direction != null) {
Boolean d = Boolean.valueOf(direction);
if (d.booleanValue()) {
tree = bucket.findNextById(messageId, tag);
} else {
tree = bucket.findPreviousById(messageId, tag);
}
}
if (tag != null && direction != null) {
Boolean d = Boolean.valueOf(direction);
// if not found, use current instead
if (tree == null) {
tree = bucket.findById(messageId);
if (d.booleanValue()) {
tree = bucket.findNextById(messageId, tag);
} else {
tree = bucket.findPreviousById(messageId, tag);
}
}
if (tree != null) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
m_codec.encode(tree, buf);
buf.readInt(); // get rid of length
response.setModel(buf.toString(Charset.forName("utf-8")));
}
} catch (Exception e) {
response.setException(e);
// if not found, use current instead
if (tree == null) {
tree = bucket.findById(messageId);
}
return response;
}
if (tree != null) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
@Override
public boolean isEligable(ModelRequest request) {
return request.getPeriod().isHistorical();
m_codec.encode(tree, buf);
buf.readInt(); // get rid of length
return buf.toString(Charset.forName("utf-8"));
} else {
return null;
}
}
}
......@@ -9,64 +9,57 @@ import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.report.page.model.spi.internal.BaseLocalModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class LocalLogViewService implements ModelService<String> {
public class LocalLogViewService extends BaseLocalModelService<String> {
@Inject
private BucketManager m_bucketManager;
@Inject(value = "html")
private MessageCodec m_codec;
public LocalLogViewService() {
super("logview");
}
@Override
public ModelResponse<String> invoke(ModelRequest request) {
protected String getReport(ModelRequest request, ModelPeriod period, String domain) throws Exception {
String messageId = request.getProperty("messageId");
String direction = request.getProperty("direction");
String tag = request.getProperty("tag");
MessageId id = MessageId.parse(messageId);
ModelResponse<String> response = new ModelResponse<String>();
try {
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain(), "local");
MessageTree tree = null;
Date timestamp = new Date(id.getTimestamp());
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(timestamp, id.getDomain(), "local");
MessageTree tree = null;
if (tag != null && direction != null) {
Boolean d = Boolean.valueOf(direction);
if (tag != null && direction != null) {
Boolean d = Boolean.valueOf(direction);
if (d.booleanValue()) {
tree = bucket.findNextById(messageId, tag);
} else {
tree = bucket.findPreviousById(messageId, tag);
}
if (d.booleanValue()) {
tree = bucket.findNextById(messageId, tag);
} else {
tree = bucket.findPreviousById(messageId, tag);
}
}
// if not found, use current instead
if (tree == null) {
tree = bucket.findById(messageId);
}
if (tree != null) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8096);
m_codec.encode(tree, buf);
buf.readInt(); // get rid of length
response.setModel(buf.toString(Charset.forName("utf-8")));
}
} catch (Exception e) {
e.printStackTrace();
response.setException(e);
// if not found, use current instead
if (tree == null) {
tree = bucket.findById(messageId);
}
return response;
}
if (tree != null) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8096);
@Override
public boolean isEligable(ModelRequest request) {
return !request.getPeriod().isHistorical();
m_codec.encode(tree, buf);
buf.readInt(); // get rid of length
return buf.toString(Charset.forName("utf-8"));
} else {
return null;
}
}
}
package com.dianping.cat.report.page.model.logview;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
import java.io.IOException;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Files;
import com.site.helper.Joiners;
import com.site.helper.Joiners.IBuilder;
import com.site.lookup.annotation.Inject;
import org.xml.sax.SAXException;
public class RemoteLogViewService implements ModelService<String> {
@Inject
private String m_host;
import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService;
@Inject
private int m_port = 2281; // default admin port
@Inject
private String m_serviceUri = "/cat/r/model";
URL buildUrl(ModelRequest request) throws MalformedURLException {
String pairs = Joiners.by('&').prefixDelimiter()
.join(request.getProperties().entrySet(), new IBuilder<Map.Entry<String, String>>() {
@Override
public String asString(Entry<String, String> e) {
return e.getKey() + "=" + e.getValue();
}
});
String url = String.format("http://%s:%s%s/%s/%s/%s?op=xml%s", m_host, m_port, m_serviceUri, "logview",
request.getDomain(), request.getPeriod(), pairs);
return new URL(url);
}
@Override
public ModelResponse<String> invoke(ModelRequest request) {
ModelResponse<String> response = new ModelResponse<String>();
try {
URL url = buildUrl(request);
String html = Files.forIO().readFrom(url.openStream(), "utf-8");
response.setModel(html);
} catch (Exception e) {
response.setException(e);
}
return response;
public class RemoteLogViewService extends BaseRemoteModelService<String> {
public RemoteLogViewService() {
super("logview");
}
@Override
public boolean isEligable(ModelRequest request) {
ModelPeriod period = request.getPeriod();
return !period.isHistorical();
}
public void setHost(String host) {
m_host = host;
}
public void setPort(int port) {
m_port = port;
}
public void setServiceUri(String serviceUri) {
m_serviceUri = serviceUri;
protected String buildModel(String content) throws SAXException, IOException {
return content;
}
}
package com.dianping.cat.report.page.model.problem;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.Cat;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.transform.DefaultMerger;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultEvent;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public class CompositeProblemService implements ModelService<ProblemReport>, Initializable {
@Inject
private List<ModelService<ProblemReport>> m_services;
import com.dianping.cat.report.page.model.spi.internal.BaseCompositeModelService;
import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService;
private ExecutorService m_threadPool;
public class CompositeProblemService extends BaseCompositeModelService<ProblemReport> {
public CompositeProblemService() {
super("problem");
}
@Override
public void initialize() throws InitializationException {
m_threadPool = Executors.newFixedThreadPool(10);
protected BaseRemoteModelService<ProblemReport> createRemoteService() {
return new RemoteProblemService();
}
@Override
public ModelResponse<ProblemReport> invoke(final ModelRequest request) {
int size = m_services.size();
final List<ModelResponse<ProblemReport>> responses = new ArrayList<ModelResponse<ProblemReport>>(size);
final Semaphore semaphore = new Semaphore(0);
final Transaction t = Cat.getProducer().newTransaction("ModelService", "Problem");
int count = 0;
t.setStatus(Message.SUCCESS);
t.addData("request", request);
for (final ModelService<ProblemReport> service : m_services) {
if (!service.isEligable(request)) {
continue;
}
m_threadPool.submit(new Runnable() {
@Override
public void run() {
try {
responses.add(service.invoke(request));
t.addData(service.toString());
} catch (Exception e) {
logError(t, e);
t.setStatus(e);
} finally {
semaphore.release();
}
}
void logError(Transaction t, Throwable cause) {
StringWriter writer = new StringWriter(2048);
cause.printStackTrace(new PrintWriter(writer));
if (cause instanceof Error) {
logEvent(t, "Error", cause.getClass().getName(), "ERROR", writer.toString());
} else if (cause instanceof RuntimeException) {
logEvent(t, "RuntimeException", cause.getClass().getName(), "ERROR", writer.toString());
} else {
logEvent(t, "Exception", cause.getClass().getName(), "ERROR", writer.toString());
}
}
void logEvent(Transaction t, String type, String name, String status, String nameValuePairs) {
Event event = new DefaultEvent(type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
event.addData(nameValuePairs);
}
event.setStatus(status);
event.complete();
t.addChild(event);
}
});
count++;
}
try {
semaphore.tryAcquire(count, 5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore it
t.setStatus(e);
} finally {
t.complete();
}
ModelResponse<ProblemReport> aggregated = new ModelResponse<ProblemReport>();
protected ProblemReport merge(List<ModelResponse<ProblemReport>> responses) {
DefaultMerger merger = null;
for (ModelResponse<ProblemReport> response : responses) {
......@@ -125,66 +36,6 @@ public class CompositeProblemService implements ModelService<ProblemReport>, Ini
}
}
aggregated.setModel(merger == null ? null : merger.getProblemReport());
return aggregated;
}
@Override
public boolean isEligable(ModelRequest request) {
for (ModelService<ProblemReport> service : m_services) {
if (service.isEligable(request)) {
return true;
}
}
return false;
}
public void setSerivces(ModelService<ProblemReport>... services) {
m_services = Arrays.asList(services);
}
/**
* Inject remote servers to load transaction model.
* <p>
*
* For example, servers: 192.168.1.1:2281,192.168.1.2,192.168.1.3
*
* @param servers
* server list separated by comma(',')
*/
public void setRemoteServers(String servers) {
List<String> endpoints = Splitters.by(',').split(servers);
String localAddress = null;
String localHost = null;
try {
localAddress = InetAddress.getLocalHost().getHostAddress();
localHost = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
// ignore it
}
for (String endpoint : endpoints) {
int pos = endpoint.indexOf(':');
String host = (pos > 0 ? endpoint.substring(0, pos) : endpoint);
int port = (pos > 0 ? Integer.parseInt(endpoint.substring(pos) + 1) : 2281);
if (port == 2281) {
if ("localhost".equals(host) || host.startsWith("127.0.")) {
// exclude localhost
continue;
} else if (host.equals(localAddress) || host.equals(localHost)) {
// exclude itself
continue;
}
}
RemoteProblemService remote = new RemoteProblemService();
remote.setHost(host);
remote.setPort(port);
m_services.add(remote);
}
return merger == null ? null : merger.getProblemReport();
}
}
......@@ -5,46 +5,41 @@ import java.util.Date;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HdfsProblemService implements ModelService<ProblemReport> {
public class HistoricalProblemService extends BaseHistoricalModelService<ProblemReport> {
@Inject
private BucketManager m_bucketManager;
public HistoricalProblemService() {
super("problem");
}
@Override
public ModelResponse<ProblemReport> invoke(ModelRequest request) {
protected ProblemReport buildModel(ModelRequest request) throws Exception {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
ModelResponse<ProblemReport> response = new ModelResponse<ProblemReport>();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(new Date(date), domain, "remote");
bucket = m_bucketManager.getReportBucket(new Date(date), getName(), "remote");
String xml = bucket.findById("problem-" + domain);
String xml = bucket.findById(domain);
if (xml == null) {
if (xml != null) {
ProblemReport report = new DefaultXmlParser().parse(xml);
response.setModel(report);
return report;
} else {
return null;
}
} catch (Exception e) {
response.setException(e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
}
}
return response;
}
@Override
public boolean isEligable(ModelRequest request) {
return request.getPeriod().isHistorical();
}
}
package com.dianping.cat.report.page.model.problem;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.problem.model.entity.AllDomains;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.lookup.annotation.Inject;
public class LocalProblemService implements ModelService<ProblemReport> {
@Inject(type = MessageConsumer.class, value = "realtime")
private RealtimeConsumer m_consumer;
@Override
public ModelResponse<ProblemReport> invoke(ModelRequest request) {
ProblemAnalyzer analyzer = getAnalyzer(request.getPeriod());
ModelResponse<ProblemReport> response = new ModelResponse<ProblemReport>();
if (analyzer != null) {
Map<String, ProblemReport> reports = analyzer.getReports();
List<String> domains = getDomains(reports.keySet());
String d = request.getDomain();
String domain = d != null && d.length() > 0 ? d : domains.isEmpty() ? null : domains.get(0);
ProblemReport report = reports.get(domain);
if (report != null) {
AllDomains allDomains = new AllDomains();
allDomains.getDomains().addAll(domains);
report.setAllDomains(allDomains);
}
response.setModel(report);
}
return response;
}
List<String> getDomains(Set<String> keys) {
List<String> domains = new ArrayList<String>(keys);
Collections.sort(domains, new Comparator<String>() {
@Override
public int compare(String d1, String d2) {
if (d1.equals("Cat")) {
return 1;
}
return d1.compareTo(d2);
}
});
return domains;
}
private ProblemAnalyzer getAnalyzer(ModelPeriod period) {
if (period.isCurrent() || period.isFuture()) {
return (ProblemAnalyzer) m_consumer.getCurrentAnalyzer("problem");
} else if (period.isLast()) {
return (ProblemAnalyzer) m_consumer.getLastAnalyzer("problem");
} else {
return null;
}
}
@Override
public boolean isEligable(ModelRequest request) {
ModelPeriod period = request.getPeriod();
import com.dianping.cat.report.page.model.spi.internal.BaseLocalModelService;
return !period.isHistorical();
public class LocalProblemService extends BaseLocalModelService<ProblemReport> {
public LocalProblemService() {
super("problem");
}
}
package com.dianping.cat.report.page.model.problem;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
import java.io.IOException;
import org.xml.sax.SAXException;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Files;
import com.site.helper.Joiners;
import com.site.helper.Joiners.IBuilder;
import com.site.lookup.annotation.Inject;
public class RemoteProblemService implements ModelService<ProblemReport> {
@Inject
private String m_host;
@Inject
private int m_port = 2281; // default admin port
@Inject
private String m_serviceUri = "/cat/r/model";
URL buildUrl(ModelRequest request) throws MalformedURLException {
String pairs = Joiners.by('&').prefixDelimiter()
.join(request.getProperties().entrySet(), new IBuilder<Map.Entry<String, String>>() {
@Override
public String asString(Entry<String, String> e) {
return e.getKey() + "=" + e.getValue();
}
});
String url = String.format("http://%s:%s%s/%s/%s/%s?op=xml%s", m_host, m_port, m_serviceUri, "problem",
request.getDomain(), request.getPeriod(), pairs);
import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService;
return new URL(url);
public class RemoteProblemService extends BaseRemoteModelService<ProblemReport> {
public RemoteProblemService() {
super("problem");
}
@Override
public ModelResponse<ProblemReport> invoke(ModelRequest request) {
ModelResponse<ProblemReport> response = new ModelResponse<ProblemReport>();
try {
URL url = buildUrl(request);
String xml = Files.forIO().readFrom(url.openStream(), "utf-8");
if (xml != null && xml.trim().length() > 0) {
ProblemReport report = new DefaultXmlParser().parse(xml);
response.setModel(report);
}
} catch (Exception e) {
response.setException(e);
}
return response;
}
@Override
public boolean isEligable(ModelRequest request) {
ModelPeriod period = request.getPeriod();
return period.isCurrent() || period.isLast();
}
public void setHost(String host) {
m_host = host;
}
public void setPort(int port) {
m_port = port;
}
public void setServiceUri(String serviceUri) {
m_serviceUri = serviceUri;
protected ProblemReport buildModel(String xml) throws SAXException, IOException {
return new DefaultXmlParser().parse(xml);
}
}
package com.dianping.cat.report.page.model.spi.internal;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultEvent;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public abstract class BaseCompositeModelService<T> implements ModelService<T>, Initializable {
@Inject
private List<ModelService<T>> m_services;
private ExecutorService m_threadPool;
private String m_name;
// introduce another list is due to a bug inside Plexus ComponentList
private List<ModelService<T>> m_allServices = new ArrayList<ModelService<T>>();
public BaseCompositeModelService(String name) {
m_name = name;
}
protected abstract BaseRemoteModelService<T> createRemoteService();
public String getName() {
return m_name;
}
@Override
public void initialize() throws InitializationException {
m_threadPool = Executors.newFixedThreadPool(10);
m_allServices.addAll(m_services);
}
@Override
public ModelResponse<T> invoke(final ModelRequest request) {
int size = m_allServices.size();
final List<ModelResponse<T>> responses = new ArrayList<ModelResponse<T>>(size);
final Semaphore semaphore = new Semaphore(0);
final Transaction t = Cat.getProducer().newTransaction("ModelService", m_name);
int count = 0;
t.setStatus(Message.SUCCESS);
t.addData("request", request);
for (final ModelService<T> service : m_allServices) {
if (!service.isEligable(request)) {
continue;
}
m_threadPool.submit(new Runnable() {
@Override
public void run() {
try {
responses.add(service.invoke(request));
t.addData(service.toString());
} catch (Exception e) {
logError(t, e);
t.setStatus(e);
} finally {
semaphore.release();
}
}
});
count++;
}
try {
semaphore.tryAcquire(count, 5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore it
t.setStatus(e);
} finally {
t.complete();
}
ModelResponse<T> aggregated = new ModelResponse<T>();
T report = merge(responses);
aggregated.setModel(report);
return aggregated;
}
@Override
public boolean isEligable(ModelRequest request) {
for (ModelService<T> service : m_allServices) {
if (service.isEligable(request)) {
return true;
}
}
return false;
}
void logError(Transaction t, Throwable cause) {
StringWriter writer = new StringWriter(2048);
cause.printStackTrace(new PrintWriter(writer));
if (cause instanceof Error) {
logEvent(t, "Error", cause.getClass().getName(), "ERROR", writer.toString());
} else if (cause instanceof RuntimeException) {
logEvent(t, "RuntimeException", cause.getClass().getName(), "ERROR", writer.toString());
} else {
logEvent(t, "Exception", cause.getClass().getName(), "ERROR", writer.toString());
}
}
void logEvent(Transaction t, String type, String name, String status, String nameValuePairs) {
Event event = new DefaultEvent(type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
event.addData(nameValuePairs);
}
event.setStatus(status);
event.complete();
t.addChild(event);
}
protected abstract T merge(final List<ModelResponse<T>> responses);
/**
* Inject remote servers to load report model.
* <p>
*
* For example, servers: 192.168.1.1:2281,192.168.1.2,192.168.1.3
*
* @param servers
* server list separated by comma(',')
*/
public void setRemoteServers(String servers) {
List<String> endpoints = Splitters.by(',').noEmptyItem().trim().split(servers);
String localAddress = null;
String localHost = null;
try {
localAddress = InetAddress.getLocalHost().getHostAddress();
localHost = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
// ignore it
}
for (String endpoint : endpoints) {
int pos = endpoint.indexOf(':');
String host = (pos > 0 ? endpoint.substring(0, pos) : endpoint);
int port = (pos > 0 ? Integer.parseInt(endpoint.substring(pos) + 1) : 2281);
if (port == 2281) {
if ("localhost".equals(host) || host.startsWith("127.0.")) {
// exclude localhost
continue;
} else if (host.equals(localAddress) || host.equals(localHost)) {
// exclude itself
continue;
}
}
BaseRemoteModelService<T> remote = createRemoteService();
remote.setHost(host);
remote.setPort(port);
m_allServices.add(remote);
}
}
public void setSerivces(ModelService<T>... services) {
for (ModelService<T> service : services) {
m_allServices.add(service);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(64);
sb.append(getClass().getSimpleName()).append('[');
sb.append("name=").append(m_name);
sb.append(']');
return sb.toString();
}
}
package com.dianping.cat.report.page.model.spi.internal;
import com.dianping.cat.Cat;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
public abstract class BaseHistoricalModelService<T> implements ModelService<T> {
private String m_name;
public BaseHistoricalModelService(String name) {
m_name = name;
}
protected abstract T buildModel(ModelRequest request) throws Exception;
public String getName() {
return m_name;
}
@Override
public ModelResponse<T> invoke(ModelRequest request) {
ModelResponse<T> response = new ModelResponse<T>();
try {
T model = buildModel(request);
response.setModel(model);
} catch (Exception e) {
Cat.getProducer().logError(e);
response.setException(e);
}
return response;
}
@Override
public boolean isEligable(ModelRequest request) {
return request.getPeriod().isHistorical();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(64);
sb.append(getClass().getSimpleName()).append('[');
sb.append("name=").append(m_name);
sb.append(']');
return sb.toString();
}
}
package com.dianping.cat.report.page.model.spi.internal;
import java.util.List;
import com.dianping.cat.Cat;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
......@@ -12,39 +11,53 @@ import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.lookup.annotation.Inject;
public class BaseLocalModelService implements ModelService<TransactionReport> {
public class BaseLocalModelService<T> implements ModelService<T> {
@Inject(type = MessageConsumer.class, value = "realtime")
private RealtimeConsumer m_consumer;
@Override
public ModelResponse<TransactionReport> invoke(ModelRequest request) {
TransactionAnalyzer analyzer = getAnalyzer(request.getPeriod());
ModelResponse<TransactionReport> response = new ModelResponse<TransactionReport>();
private String m_name;
if (analyzer != null) {
List<String> domains = analyzer.getDomains();
String d = request.getDomain();
String domain = d != null && d.length() > 0 ? d : domains.isEmpty() ? null : domains.get(0);
TransactionReport report = analyzer.getReport(domain);
public BaseLocalModelService(String name) {
m_name = name;
}
if (report != null) {
report.getDomains().addAll(domains);
}
public String getName() {
return m_name;
}
response.setModel(report);
@SuppressWarnings("unchecked")
protected T getReport(ModelRequest request, ModelPeriod period, String domain) throws Exception {
MessageAnalyzer analyzer = null;
if (period.isCurrent() || period.isFuture()) {
analyzer = m_consumer.getCurrentAnalyzer(m_name);
} else if (period.isLast()) {
analyzer = m_consumer.getLastAnalyzer(m_name);
}
return response;
if (analyzer instanceof AbstractMessageAnalyzer) {
AbstractMessageAnalyzer<T> a = (AbstractMessageAnalyzer<T>) analyzer;
return a.getReport(domain);
}
throw new RuntimeException("Internal error: this should not be reached!");
}
private TransactionAnalyzer getAnalyzer(ModelPeriod period) {
if (period.isCurrent() || period.isFuture()) {
return (TransactionAnalyzer) m_consumer.getCurrentAnalyzer("transaction");
} else if (period.isLast()) {
return (TransactionAnalyzer) m_consumer.getLastAnalyzer("transaction");
} else {
throw new RuntimeException("Internal error: this method should not be called!");
@Override
public ModelResponse<T> invoke(ModelRequest request) {
ModelResponse<T> response = new ModelResponse<T>();
try {
T report = getReport(request, request.getPeriod(), request.getDomain());
response.setModel(report);
} catch (Exception e) {
Cat.getProducer().logError(e);
response.setException(e);
}
return response;
}
@Override
......@@ -53,4 +66,15 @@ public class BaseLocalModelService implements ModelService<TransactionReport> {
return !period.isHistorical();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(64);
sb.append(getClass().getSimpleName()).append('[');
sb.append("name=").append(m_name);
sb.append(']');
return sb.toString();
}
}
......@@ -3,18 +3,19 @@ package com.dianping.cat.report.page.model.spi.internal;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
import org.xml.sax.SAXException;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Files;
import com.site.helper.Joiners;
import com.site.helper.Joiners.IBuilder;
import com.site.lookup.annotation.Inject;
public abstract class BaseRemoteModelService<T> implements ModelService<T> {
......@@ -33,35 +34,59 @@ public abstract class BaseRemoteModelService<T> implements ModelService<T> {
m_name = name;
}
protected URL buildUrl(ModelRequest request) throws MalformedURLException {
String pairs = Joiners.by('&').prefixDelimiter()
.join(request.getProperties().entrySet(), new IBuilder<Map.Entry<String, String>>() {
@Override
public String asString(Entry<String, String> e) {
return e.getKey() + "=" + e.getValue();
}
});
protected abstract T buildModel(String xml) throws SAXException, IOException;
public URL buildUrl(ModelRequest request) throws MalformedURLException {
StringBuilder sb = new StringBuilder(64);
for (Entry<String, String> e : request.getProperties().entrySet()) {
if (e.getValue() != null) {
sb.append('&');
// TODO do url encode here
sb.append(e.getKey()).append('=').append(e.getValue());
}
}
String url = String.format("http://%s:%s%s/%s/%s/%s?op=xml%s", m_host, m_port, m_serviceUri, m_name,
request.getDomain(), request.getPeriod(), pairs);
request.getDomain(), request.getPeriod(), sb.toString());
return new URL(url);
}
public String getName() {
return m_name;
}
@Override
public ModelResponse<T> invoke(ModelRequest request) {
ModelResponse<T> response = new ModelResponse<T>();
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction("RemoteModel", m_name);
t.addData("request", request);
try {
URL url = buildUrl(request);
t.addData("url", url);
String xml = Files.forIO().readFrom(url.openStream(), "utf-8");
int len = xml == null ? 0 : xml.length();
if (xml != null && xml.trim().length() > 0) {
T report = parse(xml);
t.addData("length", len);
if (len > 0) {
T report = buildModel(xml);
response.setModel(report);
}
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
response.setException(e);
} finally {
t.complete();
}
return response;
......@@ -74,8 +99,6 @@ public abstract class BaseRemoteModelService<T> implements ModelService<T> {
return !period.isHistorical();
}
protected abstract T parse(String xml) throws SAXException, IOException;
public void setHost(String host) {
m_host = host;
}
......@@ -87,4 +110,15 @@ public abstract class BaseRemoteModelService<T> implements ModelService<T> {
public void setServiceUri(String serviceUri) {
m_serviceUri = serviceUri;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(64);
sb.append(getClass().getSimpleName()).append('[');
sb.append("name=").append(m_name);
sb.append(']');
return sb.toString();
}
}
package com.dianping.cat.report.page.model.transaction;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public class CompositeTransactionService implements ModelService<TransactionReport>, Initializable {
@Inject
private List<ModelService<TransactionReport>> m_services = new ArrayList<ModelService<TransactionReport>>();
import com.dianping.cat.report.page.model.spi.internal.BaseCompositeModelService;
import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService;
private ExecutorService m_threadPool;
public class CompositeTransactionService extends BaseCompositeModelService<TransactionReport> {
public CompositeTransactionService() {
super("transaction");
}
@Override
public void initialize() throws InitializationException {
m_threadPool = Executors.newFixedThreadPool(10);
protected BaseRemoteModelService<TransactionReport> createRemoteService() {
return new RemoteTransactionService();
}
@Override
public ModelResponse<TransactionReport> invoke(final ModelRequest request) {
int size = m_services.size();
final List<ModelResponse<TransactionReport>> responses = new ArrayList<ModelResponse<TransactionReport>>(size);
final Semaphore semaphore = new Semaphore(0);
int count = 0;
for (final ModelService<TransactionReport> service : m_services) {
if (service.isEligable(request)) {
m_threadPool.submit(new Runnable() {
@Override
public void run() {
try {
responses.add(service.invoke(request));
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
});
count++;
}
}
try {
semaphore.tryAcquire(count, 5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore it
}
ModelResponse<TransactionReport> aggregated = new ModelResponse<TransactionReport>();
protected TransactionReport merge(List<ModelResponse<TransactionReport>> responses) {
TransactionReportMerger merger = null;
for (ModelResponse<TransactionReport> response : responses) {
......@@ -78,68 +35,6 @@ public class CompositeTransactionService implements ModelService<TransactionRepo
}
}
aggregated.setModel(merger == null ? null : merger.getTransactionReport());
return aggregated;
}
@Override
public boolean isEligable(ModelRequest request) {
for (ModelService<TransactionReport> service : m_services) {
if (service.isEligable(request)) {
return true;
}
}
return false;
}
public void setSerivces(ModelService<TransactionReport>... services) {
for (ModelService<TransactionReport> service : services) {
m_services.add(service);
}
}
/**
* Inject remote servers to load transaction model.
* <p>
*
* For example, servers: 192.168.1.1:2281,192.168.1.2,192.168.1.3
*
* @param servers
* server list separated by comma(',')
*/
public void setRemoteServers(String servers) {
List<String> endpoints = Splitters.by(',').split(servers);
String localAddress = null;
String localHost = null;
try {
localAddress = InetAddress.getLocalHost().getHostAddress();
localHost = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
// ignore it
}
for (String endpoint : endpoints) {
int pos = endpoint.indexOf(':');
String host = (pos > 0 ? endpoint.substring(0, pos) : endpoint);
int port = (pos > 0 ? Integer.parseInt(endpoint.substring(pos) + 1) : 2281);
if (port == 2281) {
if ("localhost".equals(host) || "127.0.0.1".equals(host)) {
// exclude localhost
continue;
} else if (host.equals(localAddress) || host.equals(localHost)) {
// exclude itself
continue;
}
}
RemoteTransactionModelService remote = new RemoteTransactionModelService();
remote.setHost(host);
remote.setPort(port);
m_services.add(remote);
}
return merger == null ? null : merger.getTransactionReport();
}
}
......@@ -5,46 +5,41 @@ import java.util.Date;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HdfsTransactionService implements ModelService<TransactionReport> {
public class HistoricalTransactionService extends BaseHistoricalModelService<TransactionReport> {
@Inject
private BucketManager m_bucketManager;
public HistoricalTransactionService() {
super("transaction");
}
@Override
public ModelResponse<TransactionReport> invoke(ModelRequest request) {
protected TransactionReport buildModel(ModelRequest request) throws Exception {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
ModelResponse<TransactionReport> response = new ModelResponse<TransactionReport>();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(new Date(date), domain, "remote");
bucket = m_bucketManager.getReportBucket(new Date(date), getName(), "remote");
String xml = bucket.findById(domain);
if (xml != null) {
TransactionReport report = new DefaultXmlParser().parse(xml);
response.setModel(report);
return report;
} else {
return null;
}
} catch (Exception e) {
response.setException(e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
}
}
return response;
}
@Override
public boolean isEligable(ModelRequest request) {
return request.getPeriod().isHistorical();
}
}
package com.dianping.cat.report.page.model.transaction;
import java.util.List;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.lookup.annotation.Inject;
public class LocalTransactionService implements ModelService<TransactionReport> {
@Inject(type = MessageConsumer.class, value = "realtime")
private RealtimeConsumer m_consumer;
@Override
public ModelResponse<TransactionReport> invoke(ModelRequest request) {
TransactionAnalyzer analyzer = getAnalyzer(request.getPeriod());
ModelResponse<TransactionReport> response = new ModelResponse<TransactionReport>();
if (analyzer != null) {
List<String> domains = analyzer.getDomains();
String d = request.getDomain();
String domain = d != null && d.length() > 0 ? d : domains.isEmpty() ? null : domains.get(0);
TransactionReport report = analyzer.getReport(domain);
if (report != null) {
report.getDomains().addAll(domains);
}
response.setModel(report);
}
return response;
}
private TransactionAnalyzer getAnalyzer(ModelPeriod period) {
if (period.isCurrent() || period.isFuture()) {
return (TransactionAnalyzer) m_consumer.getCurrentAnalyzer("transaction");
} else if (period.isLast()) {
return (TransactionAnalyzer) m_consumer.getLastAnalyzer("transaction");
} else {
throw new RuntimeException("Internal error: this method should not be called!");
}
}
@Override
public boolean isEligable(ModelRequest request) {
ModelPeriod period = request.getPeriod();
import com.dianping.cat.report.page.model.spi.internal.BaseLocalModelService;
return !period.isHistorical();
public class LocalTransactionService extends BaseLocalModelService<TransactionReport> {
public LocalTransactionService() {
super("transaction");
}
}
package com.dianping.cat.report.page.model.transaction;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Files;
import com.site.helper.Joiners;
import com.site.helper.Joiners.IBuilder;
import com.site.lookup.annotation.Inject;
public class RemoteTransactionModelService implements ModelService<TransactionReport> {
@Inject
private String m_host;
@Inject
private int m_port = 2281; // default admin port
@Inject
private String m_serviceUri = "/cat/r/model";
URL buildUrl(ModelRequest request) throws MalformedURLException {
String pairs = Joiners.by('&').prefixDelimiter()
.join(request.getProperties().entrySet(), new IBuilder<Map.Entry<String, String>>() {
@Override
public String asString(Entry<String, String> e) {
return e.getKey() + "=" + e.getValue();
}
});
String url = String.format("http://%s:%s%s/%s/%s/%s?op=xml%s", m_host, m_port, m_serviceUri, "transaction",
request.getDomain(), request.getPeriod(), pairs);
return new URL(url);
}
@Override
public ModelResponse<TransactionReport> invoke(ModelRequest request) {
ModelResponse<TransactionReport> response = new ModelResponse<TransactionReport>();
try {
URL url = buildUrl(request);
String xml = Files.forIO().readFrom(url.openStream(), "utf-8");
if (xml != null && xml.trim().length() > 0) {
TransactionReport report = new DefaultXmlParser().parse(xml);
response.setModel(report);
}
} catch (Exception e) {
response.setException(e);
}
return response;
}
@Override
public boolean isEligable(ModelRequest request) {
ModelPeriod period = request.getPeriod();
return !period.isHistorical();
}
public void setHost(String host) {
m_host = host;
}
public void setPort(int port) {
m_port = port;
}
public void setServiceUri(String serviceUri) {
m_serviceUri = serviceUri;
}
}
package com.dianping.cat.report.page.model.transaction;
import java.io.IOException;
import org.xml.sax.SAXException;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.report.page.model.spi.internal.BaseRemoteModelService;
public class RemoteTransactionService extends BaseRemoteModelService<TransactionReport> {
public RemoteTransactionService() {
super("transaction");
}
@Override
protected TransactionReport buildModel(String xml) throws SAXException, IOException {
return new DefaultXmlParser().parse(xml);
}
}
......@@ -4,6 +4,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import com.dianping.cat.consumer.problem.model.entity.AllDomains;
import com.dianping.cat.consumer.problem.model.entity.Entry;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.report.page.AbstractReportModel;
......@@ -16,9 +17,9 @@ public class Model extends AbstractReportModel<Action, Context> {
private String m_ipAddress;
private int m_hour;
private List<Entry> m_entries;
private List<ProblemStatistics> m_statistics;
public Model(Context ctx) {
......@@ -44,7 +45,13 @@ public class Model extends AbstractReportModel<Action, Context> {
if (m_report == null) {
return Collections.emptySet();
} else {
return m_report.getAllDomains().getDomains();
AllDomains allDomains = m_report.getAllDomains();
if (allDomains == null) {
return Collections.emptySet();
} else {
return allDomains.getDomains();
}
}
}
......@@ -81,18 +88,18 @@ public class Model extends AbstractReportModel<Action, Context> {
}
public List<Entry> getEntries() {
return m_entries;
}
return m_entries;
}
public void setEntries(List<Entry> entries) {
m_entries = entries;
}
m_entries = entries;
}
public List<ProblemStatistics> getStatistics() {
return m_statistics;
}
return m_statistics;
}
public void setStatistics(List<ProblemStatistics> statistics) {
m_statistics = statistics;
}
m_statistics = statistics;
}
}
......@@ -39,8 +39,8 @@
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>transaction-hdfs</role-hint>
<implementation>com.dianping.cat.report.page.model.transaction.HdfsTransactionService</implementation>
<role-hint>transaction-historical</role-hint>
<implementation>com.dianping.cat.report.page.model.transaction.HistoricalTransactionService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
......@@ -51,12 +51,15 @@
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>transaction</role-hint>
<implementation>com.dianping.cat.report.page.model.transaction.CompositeTransactionService</implementation>
<configuration>
<remoteServers>192.168.7.43</remoteServers>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hints>
<role-hint>transaction-local</role-hint>
<role-hint>transaction-hdfs</role-hint>
<role-hint>transaction-historical</role-hint>
</role-hints>
<field-name>m_services</field-name>
</requirement>
......@@ -75,8 +78,8 @@
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>problem-hdfs</role-hint>
<implementation>com.dianping.cat.report.page.model.problem.HdfsProblemService</implementation>
<role-hint>problem-historical</role-hint>
<implementation>com.dianping.cat.report.page.model.problem.HistoricalProblemService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
......@@ -87,12 +90,15 @@
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>problem</role-hint>
<implementation>com.dianping.cat.report.page.model.problem.CompositeProblemService</implementation>
<configuration>
<remoteServers>192.168.7.43</remoteServers>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hints>
<role-hint>problem-local</role-hint>
<role-hint>problem-hdfs</role-hint>
<role-hint>problem-historical</role-hint>
</role-hints>
<field-name>m_services</field-name>
</requirement>
......@@ -113,6 +119,9 @@
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>ip</role-hint>
<implementation>com.dianping.cat.report.page.model.ip.CompositeIpService</implementation>
<configuration>
<remoteServers>192.168.7.43</remoteServers>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
......@@ -128,6 +137,10 @@
<role-hint>logview-local</role-hint>
<implementation>com.dianping.cat.report.page.model.logview.LocalLogViewService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>realtime</role-hint>
</requirement>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
......@@ -139,8 +152,8 @@
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>logview-hdfs</role-hint>
<implementation>com.dianping.cat.report.page.model.logview.HdfsLogViewService</implementation>
<role-hint>logview-historical</role-hint>
<implementation>com.dianping.cat.report.page.model.logview.HistoricalLogViewService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
......@@ -148,12 +161,6 @@
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>html</role-hint>
<field-name>m_htmlCodec</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
<field-name>m_plainCodec</field-name>
</requirement>
</requirements>
</component>
......@@ -161,12 +168,15 @@
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>logview</role-hint>
<implementation>com.dianping.cat.report.page.model.logview.CompositeLogViewService</implementation>
<configuration>
<remoteServers>192.168.7.43</remoteServers>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hints>
<role-hint>logview-local</role-hint>
<role-hint>logview-hdfs</role-hint>
<role-hint>logview-historical</role-hint>
</role-hints>
<field-name>m_services</field-name>
</requirement>
......@@ -369,6 +379,11 @@
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>html</role-hint>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>realtime</role-hint>
<field-name>m_consumer</field-name>
</requirement>
</requirements>
</component>
<component>
......
......@@ -102,7 +102,7 @@ public class TestServer extends SimpleServerSupport {
@Test
public void startServer() throws Exception {
// open the page in the default browser
//s_adaptor.display("/cat/r");
s_adaptor.display("/cat/r");
System.out.println(String.format("[%s] Press any key to stop server ... ", getTimestamp()));
System.in.read();
......
......@@ -20,7 +20,7 @@ public class TransactionModelServiceTest extends ComponentTestCase {
ModelService<?> composite = lookup(ModelService.class, "transaction");
Assert.assertEquals(LocalTransactionService.class, local.getClass());
Assert.assertEquals(RemoteTransactionModelService.class, localhost.getClass());
Assert.assertEquals(RemoteTransactionService.class, localhost.getClass());
Assert.assertEquals(CompositeTransactionService.class, composite.getClass());
}
......@@ -35,7 +35,7 @@ public class TransactionModelServiceTest extends ComponentTestCase {
@Test
public void testRemote() throws Exception {
RemoteTransactionModelService remote = (RemoteTransactionModelService) lookup(ModelService.class,
RemoteTransactionService remote = (RemoteTransactionService) lookup(ModelService.class,
"transaction-localhost");
ModelRequest request = ModelRequest.from("Cat", "CURRENT");
......
......@@ -48,7 +48,7 @@
<dependency>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<version>1.0.0-a1</version>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册