提交 40adffaa 编写于 作者: Y youyong205

fix bug

上级 dc93bfa8
package com.dianping.cat.broker.api.app;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import org.codehaus.plexus.logging.LogEnabled;
......@@ -16,7 +18,7 @@ import com.dianping.cat.config.app.AppDataService;
public class AppDataConsumer implements Initializable, LogEnabled {
public static final long DURATION = 5 * 60 * 1000L;
@Inject
private AppDataService m_appDataService;
......@@ -99,12 +101,15 @@ public class AppDataConsumer implements Initializable, LogEnabled {
private class BucketThreadController implements Task {
private SimpleDateFormat m_sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
private void closeLastTask(long currentDuration) {
Long last = new Long(currentDuration - DURATION);
BucketHandler lastBucketHandler = m_tasks.get(last);
if (lastBucketHandler != null) {
lastBucketHandler.shutdown();
m_logger.info("closed bucket handler ,time " + m_sdf.format(new Date(currentDuration)));
}
}
......@@ -126,7 +131,7 @@ public class AppDataConsumer implements Initializable, LogEnabled {
try {
long currentDuration = curTime - curTime % DURATION;
removeLastLastTask(currentDuration);
closeLastTask(currentDuration);
startCurrentTask(currentDuration);
......@@ -151,21 +156,26 @@ public class AppDataConsumer implements Initializable, LogEnabled {
Long cur = new Long(currentDuration);
if (m_tasks.get(cur) == null) {
BucketHandler curBucketHandler = new BucketHandler(cur, m_appDataService);
m_logger.info("starting bucket handler ,time " + m_sdf.format(new Date(currentDuration)));
Threads.forGroup("Cat").start(curBucketHandler);
m_tasks.put(cur, curBucketHandler);
m_logger.info("started bucket handler ,time " + m_sdf.format(new Date(currentDuration)));
}
}
private void startNextTask(long currentDuration) {
Long next = new Long(currentDuration + DURATION);
if (m_tasks.get(next) == null) {
BucketHandler nextBucketHandler = new BucketHandler(next, m_appDataService);
m_logger.info("starting bucket handler ,time " + m_sdf.format(new Date(next)));
Threads.forGroup("Cat").start(nextBucketHandler);
m_tasks.put(next, nextBucketHandler);
m_logger.info("started bucket handler ,time " + m_sdf.format(new Date(next)));
}
}
}
}
......@@ -94,13 +94,11 @@ public class BucketHandler implements Task {
@Override
public String getName() {
return "BucketHandler";
return "BucketHandler-" + m_startTime;
}
public boolean isActive() {
synchronized (this) {
return m_isActive;
}
return m_isActive;
}
private void processEntity(AppData appData) {
......@@ -144,7 +142,11 @@ public class BucketHandler implements Task {
AppData appData = m_appDataQueue.poll();
if (appData != null) {
processEntity(appData);
try {
processEntity(appData);
} catch (Exception e) {
Cat.logError(e);
}
}
}
......@@ -163,9 +165,7 @@ public class BucketHandler implements Task {
@Override
public void shutdown() {
synchronized (this) {
m_isActive = false;
}
m_isActive = false;
}
}
......@@ -156,7 +156,6 @@ public class MonitorManager implements Initializable, LogEnabled {
buildMessage(entity, url, ipInfo);
} else {
Cat.logEvent("ip", "notFound", Event.SUCCESS, ip);
m_logger.error(String.format("can't find ip for %s", ip));
}
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册