提交 e7f83714 编写于 作者: Y youyong205

refactor cat

上级 99cca7ba
......@@ -17,7 +17,7 @@ public class CatConsumerModule extends AbstractModule {
TcpSocketReceiver receiver = ctx.lookup(TcpSocketReceiver.class);
ServerConfigManager manager = ctx.lookup(ServerConfigManager.class);
ctx.lookup(AggregationConfigManager.class);
int encodeThreadNumber = 10;
int encodeThreadNumber = 12;
if (manager.isLocalMode()) {
encodeThreadNumber = 1;
......
......@@ -135,7 +135,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(BucketManager.class, HourlyReportDao.class, HourlyReportContentDao.class) //
.config(E("name").value(ID)));
all.add(C(ReportDelegate.class, ID, ProblemDelegate.class) //
.req(ProblemReportAggregation.class, TaskManager.class));
.req(ProblemReportAggregation.class, TaskManager.class, ServerConfigManager.class));
return all;
}
......@@ -180,7 +180,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(ReportDelegate.class, ID) //
.req(BucketManager.class, HourlyReportDao.class, HourlyReportContentDao.class) //
.config(E("name").value(ID)));
all.add(C(ReportDelegate.class, ID, TransactionDelegate.class).req(TaskManager.class));
all.add(C(ReportDelegate.class, ID, TransactionDelegate.class).req(TaskManager.class, ServerConfigManager.class));
return all;
}
......
......@@ -8,6 +8,7 @@ import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.Constants;
import com.dianping.cat.ServerConfigManager;
import com.dianping.cat.consumer.problem.model.transform.DefaultNativeBuilder;
import com.dianping.cat.consumer.problem.model.transform.DefaultNativeParser;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
......@@ -24,6 +25,9 @@ public class ProblemDelegate implements ReportDelegate<ProblemReport> {
@Inject
private TaskManager m_taskManager;
@Inject
private ServerConfigManager m_manager;
@Override
public void afterLoad(Map<String, ProblemReport> reports) {
}
......@@ -57,17 +61,13 @@ public class ProblemDelegate implements ReportDelegate<ProblemReport> {
return m_problemReportAggregation.getReport();
}
private boolean validateDomain(String domain) {
return !domain.equals(Constants.FRONT_END);
}
public ProblemReport createAggregatedReport(Map<String, ProblemReport> reports) {
ProblemReport report = new ProblemReport(Constants.ALL);
ProblemReportAllBuilder visitor = new ProblemReportAllBuilder(report);
try {
for (ProblemReport temp : reports.values()) {
if (validateDomain(temp.getDomain())) {
if (m_manager.validateDomain(temp.getDomain())) {
report.getIps().add(temp.getDomain());
report.getDomainNames().add(temp.getDomain());
visitor.visitProblemReport(temp);
......@@ -86,7 +86,13 @@ public class ProblemDelegate implements ReportDelegate<ProblemReport> {
@Override
public boolean createHourlyTask(ProblemReport report) {
return m_taskManager.createTask(report.getStartTime(), report.getDomain(), ProblemAnalyzer.ID, TaskProlicy.ALL);
String domain = report.getDomain();
if (m_manager.validateDomain(domain)) {
return m_taskManager.createTask(report.getStartTime(), domain, ProblemAnalyzer.ID, TaskProlicy.ALL);
} else {
return true;
}
}
@Override
......
......@@ -10,6 +10,7 @@ import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.Constants;
import com.dianping.cat.ServerConfigManager;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultNativeBuilder;
import com.dianping.cat.consumer.transaction.model.transform.DefaultNativeParser;
......@@ -23,6 +24,9 @@ public class TransactionDelegate implements ReportDelegate<TransactionReport> {
@Inject
private TaskManager m_taskManager;
@Inject
private ServerConfigManager m_manager;
@Override
public void afterLoad(Map<String, TransactionReport> reports) {
}
......@@ -79,8 +83,14 @@ public class TransactionDelegate implements ReportDelegate<TransactionReport> {
@Override
public boolean createHourlyTask(TransactionReport report) {
return m_taskManager.createTask(report.getStartTime(), report.getDomain(), TransactionAnalyzer.ID,
TaskProlicy.ALL);
String domain = report.getDomain();
if (m_manager.validateDomain(domain)) {
return m_taskManager.createTask(report.getStartTime(), report.getDomain(), TransactionAnalyzer.ID,
TaskProlicy.ALL);
} else {
return true;
}
}
@Override
......
......@@ -62,6 +62,9 @@
<requirement>
<role>com.dianping.cat.task.TaskManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
......@@ -211,6 +214,9 @@
<requirement>
<role>com.dianping.cat.task.TaskManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
......
......@@ -324,7 +324,7 @@ public class ServerConfigManager implements Initializable, LogEnabled {
}
public boolean validateDomain(String domain) {
return !domain.equals("PhoenixAgent") && !domain.equals(Constants.FRONT_END);
return !domain.equals("PhoenixAgent") && !domain.equals(Constants.FRONT_END) && !domain.equals(Constants.ALL);
}
}
......@@ -63,7 +63,7 @@ public class TcpSocketReceiver implements LogEnabled {
private BlockingQueue<ChannelBuffer> m_queue;
private int m_queueSize = 100000;
private int m_queueSize = 300000;
private volatile int m_errorCount;
......
......@@ -7,11 +7,8 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPOutputStream;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.unidal.helper.Files;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.message.internal.MessageId;
......@@ -19,7 +16,7 @@ import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class LocalMessageBucket implements MessageBucket, LogEnabled {
public class LocalMessageBucket implements MessageBucket {
public static final String ID = "local";
private static final int MAX_BLOCK_SIZE = 1 << 16; // 64K
......@@ -48,31 +45,18 @@ public class LocalMessageBucket implements MessageBucket, LogEnabled {
private int m_blockSize;
private Logger m_logger;
public void archive() throws IOException {
File from = new File(m_baseDir, m_dataFile);
File outbox = new File(m_baseDir, "outbox");
File to = new File(outbox, m_dataFile);
File fromIndex = new File(m_baseDir, m_dataFile + ".idx");
File toIndex = new File(outbox, m_dataFile + ".idx");
File parentFile = from.getParentFile();
to.getParentFile().mkdirs();
Files.forDir().copyFile(from, to);
Files.forDir().copyFile(fromIndex, toIndex);
boolean flag = Files.forDir().delete(from);
boolean indexFlag = Files.forDir().delete(fromIndex);
if (flag == false) {
m_logger.error("delete data file error " + from);
}
if (indexFlag == false) {
m_logger.error("delete index file error " + fromIndex);
}
File parentFile = from.getParentFile();
toIndex.getParentFile().mkdirs();
from.renameTo(to);
fromIndex.renameTo(toIndex);
parentFile.delete(); // delete it if empty
parentFile.getParentFile().delete(); // delete it if empty
......@@ -182,9 +166,4 @@ public class LocalMessageBucket implements MessageBucket, LogEnabled {
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
}
......@@ -23,7 +23,7 @@
<query-defs>
<query name="insert" type="INSERT">
<statement><![CDATA[
INSERT INTO <TABLE/>
INSERT IGNORE INTO <TABLE/>
(<FIELDS/>)
VALUES
(<VALUES/>)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册