提交 7e2b761c 编写于 作者: Y youyong205

modify the serverconfig

上级 87c01975
......@@ -141,8 +141,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
final List<Component> all = new ArrayList<Component>();
final String ID = EventAnalyzer.ID;
all.add(C(MessageAnalyzer.class, ID, EventAnalyzer.class).is(PER_LOOKUP) //
.req(ReportManager.class, ID).req(ServerConfigManager.class, ServerFilterConfigManager.class));
all.add(C(MessageAnalyzer.class, ID, EventAnalyzer.class).is(PER_LOOKUP).req(ReportManager.class, ID)
.req(ServerConfigManager.class, ServerFilterConfigManager.class, EventDelegate.class));
all.add(C(ReportManager.class, ID, DefaultReportManager.class).is(PER_LOOKUP) //
.req(ReportDelegate.class, ID) //
.req(ReportBucketManager.class, HourlyReportDao.class, HourlyReportContentDao.class, DomainValidator.class) //
......
......@@ -229,11 +229,8 @@ public class LogviewUploader implements Task {
private void uploadFile(String path) {
File file = new File(m_baseDir, path);
boolean success = m_logviewUploader.uploadLogviewFile(path, file);
if (success) {
deleteFile(path);
}
m_logviewUploader.uploadLogviewFile(path, file);
}
}
\ No newline at end of file
package com.dianping.cat.consumer.event;
import java.util.List;
import java.util.Map;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Constants;
import com.dianping.cat.analysis.AbstractMessageAnalyzer;
import com.dianping.cat.config.server.ServerFilterConfigManager;
import com.dianping.cat.consumer.event.model.entity.EventName;
......@@ -16,14 +18,15 @@ import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.report.ReportManager;
import com.dianping.cat.report.DefaultReportManager.StoragePolicy;
import com.dianping.cat.report.ReportManager;
public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implements LogEnabled {
public static final String ID = "event";
private EventTpsStatisticsComputer m_computer = new EventTpsStatisticsComputer();
@Inject
private EventDelegate m_delegate;
@Inject(ID)
private ReportManager<EventReport> m_reportManager;
......@@ -31,6 +34,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
@Inject
private ServerFilterConfigManager m_serverFilterConfigManager;
private EventTpsStatisticsComputer m_computer = new EventTpsStatisticsComputer();
@Override
public synchronized void doCheckpoint(boolean atEnd) {
if (atEnd && !isLocalMode()) {
......@@ -52,19 +57,25 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
@Override
public EventReport getReport(String domain) {
long period = getStartTime();
long timestamp = System.currentTimeMillis();
long remainder = timestamp % 3600000;
long current = timestamp - remainder;
EventReport report = m_reportManager.getHourlyReport(period, domain, false);
report.getDomainNames().addAll(m_reportManager.getDomains(getStartTime()));
if (period == current) {
report.accept(m_computer.setDuration(remainder / 1000));
} else if (period < current) {
report.accept(m_computer.setDuration(3600));
if (!Constants.ALL.equals(domain)) {
long period = getStartTime();
long timestamp = System.currentTimeMillis();
long remainder = timestamp % 3600000;
long current = timestamp - remainder;
EventReport report = m_reportManager.getHourlyReport(period, domain, false);
report.getDomainNames().addAll(m_reportManager.getDomains(getStartTime()));
if (period == current) {
report.accept(m_computer.setDuration(remainder / 1000));
} else if (period < current) {
report.accept(m_computer.setDuration(3600));
}
return report;
} else {
Map<String, EventReport> reports = m_reportManager.getHourlyReports(getStartTime());
return m_delegate.createAggregatedReport(reports);
}
return report;
}
@Override
......
......@@ -81,6 +81,9 @@
<requirement>
<role>com.dianping.cat.config.server.ServerFilterConfigManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.consumer.event.EventDelegate</role>
</requirement>
</requirements>
</component>
<component>
......
......@@ -180,6 +180,16 @@ public class ServerConfigManager implements LogEnabled {
return 30;
}
}
public int getHdfsUploadThreadCount() {
if (m_config != null) {
StorageConfig storage = m_config.getStorage();
return storage.getUploadThread();
} else {
return 5;
}
}
public Map<String, Domain> getLongConfigDomains() {
if (m_config != null) {
......
......@@ -14,6 +14,7 @@
</entity>
<entity name="storage" class-name="StorageConfig">
<attribute name="local-base-dir" value-type="String" default-value="target/bucket" />
<attribute name="upload-thread" value-type="int" primitive="true" default-value="5" />
<attribute name="max-hdfs-storage-time" value-type="int" primitive="true" default-value="15" />
<attribute name="local-report-storage-time" value-type="int" primitive="true" default-value="3" />
<attribute name="local-logivew-storage-time" value-type="int" primitive="true" default-value="7" />
......
......@@ -30,7 +30,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(ServerConfigManager.class));
all.add(C(HdfsUploader.class) //
.req(FileSystemManager.class));
.req(FileSystemManager.class, ServerConfigManager.class));
all.add(C(MessageBucket.class, HdfsMessageBucket.ID, HdfsMessageBucket.class) //
.is(PER_LOOKUP) //
......
......@@ -3,6 +3,9 @@ package com.dianping.cat.hadoop.hdfs;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
......@@ -12,9 +15,12 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.helper.Files;
import org.unidal.helper.Files.AutoClose;
import org.unidal.helper.Formats;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
......@@ -22,18 +28,43 @@ import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
public class HdfsUploader implements LogEnabled {
public class HdfsUploader implements LogEnabled, Initializable {
@Inject
private FileSystemManager m_fileSystemManager;
@Inject
private ServerConfigManager m_serverConfigManager;
private ThreadPoolExecutor m_executors;
private File m_baseDir;
private Logger m_logger;
private void deleteFile(String path) {
File file = new File(m_baseDir, path);
File parent = file.getParentFile();
file.delete();
parent.delete(); // delete it if empty
parent.getParentFile().delete(); // delete it if empty
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() throws InitializationException {
int thread = m_serverConfigManager.getHdfsUploadThreadCount();
m_baseDir = new File(m_serverConfigManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR));
m_executors = new ThreadPoolExecutor(thread, thread, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(5000), new ThreadPoolExecutor.CallerRunsPolicy());
}
private FSDataOutputStream makeHdfsOutputStream(String path) throws IOException {
StringBuilder baseDir = new StringBuilder(32);
FileSystem fs = m_fileSystemManager.getFileSystem(ServerConfigManager.DUMP_DIR, baseDir);
......@@ -54,7 +85,7 @@ public class HdfsUploader implements LogEnabled {
return out;
}
public boolean uploadLogviewFile(String path, File file) {
public boolean upload(String path, File file) {
Transaction t = Cat.newTransaction("System", "UploadDump");
t.addData("file", path);
......@@ -79,6 +110,7 @@ public class HdfsUploader implements LogEnabled {
if (!file.delete()) {
m_logger.warn("Can't delete file: " + file);
}
deleteFile(path);
return true;
} catch (AlreadyBeingCreatedException e) {
Cat.logError(e);
......@@ -98,11 +130,47 @@ public class HdfsUploader implements LogEnabled {
if (fdos != null) {
fdos.close();
}
} catch (IOException e) {
} catch (Exception e) {
Cat.logError(e);
} finally {
t.complete();
}
t.complete();
}
return false;
}
public void uploadLogviewFile(String path, File file) {
try {
m_executors.submit(new Uploader(path, file));
} catch (Exception e) {
Cat.logError(e);
}
}
public class Uploader implements Task {
private String m_path;
private File m_file;
public Uploader(String path, File file) {
m_path = path;
m_file = file;
}
@Override
public String getName() {
return "hdfs-uploader";
}
@Override
public void run() {
upload(m_path, m_file);
}
@Override
public void shutdown() {
}
}
}
......@@ -16,6 +16,9 @@
<requirement>
<role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.config.server.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册