提交 16d8177c 编写于 作者: B bobbeyreese 提交者: Matteo Merli

Use historical data from old API in new API (#329)

上级 e34b595b
......@@ -31,7 +31,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.yahoo.pulsar.broker.TimeAverageMessageData;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
......@@ -89,6 +91,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// The number of effective samples to keep for observing short term data.
public static final int NUM_SHORT_SAMPLES = 10;
// Path to ZNode whose children contain ResourceQuota jsons.
public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";
// Path to ZNode containing TimeAverageBrokerData jsons for each broker.
public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
......@@ -265,8 +270,29 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
BundleData bundleData = null;
try {
final String bundleZPath = getBundleDataZooKeeperPath(bundle);
final String quotaZPath = String.format("%s/%s", RESOURCE_QUOTA_ZPATH, bundle);
if (zkClient.exists(bundleZPath, null) != null) {
bundleData = readJson(zkClient.getData(bundleZPath, null, null), BundleData.class);
} else if (zkClient.exists(quotaZPath, null) != null) {
final ResourceQuota quota = readJson(zkClient.getData(quotaZPath, null, null), ResourceQuota.class);
bundleData = new BundleData(NUM_SHORT_SAMPLES, NUM_LONG_SAMPLES);
// Initialize from existing resource quotas if new API ZNodes do not exist.
final TimeAverageMessageData shortTermData = bundleData.getShortTermData();
final TimeAverageMessageData longTermData = bundleData.getLongTermData();
shortTermData.setMsgRateIn(quota.getMsgRateIn());
shortTermData.setMsgRateOut(quota.getMsgRateOut());
shortTermData.setMsgThroughputIn(quota.getBandwidthIn());
shortTermData.setMsgThroughputOut(quota.getBandwidthOut());
longTermData.setMsgRateIn(quota.getMsgRateIn());
longTermData.setMsgRateOut(quota.getMsgRateOut());
longTermData.setMsgThroughputIn(quota.getBandwidthIn());
longTermData.setMsgThroughputOut(quota.getBandwidthOut());
// Assume ample history.
shortTermData.setNumSamples(NUM_SHORT_SAMPLES);
longTermData.setNumSamples(NUM_LONG_SAMPLES);
}
} catch (Exception e) {
log.warn("Error when trying to find bundle {} on zookeeper: {}", bundle, e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册