提交 9463e27b 编写于 作者: S Sebastian Kruse 提交者: Stephan Ewen

[FLINK-1096] Correction to histogram accumulator

This closes #117
* each key is associated with the number of times it was inserted into the accumulator
* backed histogram with a tree map to present the entries sorted by key
上级 e17575b5
......@@ -29,7 +29,8 @@ import com.google.common.collect.Maps;
/**
* Histogram for discrete-data. Let's you populate a histogram distributedly.
* Implemented as a Integer->Integer HashMap
* Implemented as a Integer->Integer TreeMap, so that the entries are sorted
* according to the values.
*
* Could be extended to continuous values later, but then we need to dynamically
* decide about the bin size in an online algorithm (or ask the user)
......@@ -38,21 +39,18 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
private static final long serialVersionUID = 1L;
private Map<Integer, Integer> hashMap = Maps.newHashMap();
private Map<Integer, Integer> treeMap = Maps.newTreeMap();
@Override
public void add(Integer value) {
Integer current = hashMap.get(value);
Integer newValue = value;
if (current != null) {
newValue = current + newValue;
}
this.hashMap.put(value, newValue);
Integer current = treeMap.get(value);
Integer newValue = (current != null ? current : 0) + 1;
this.treeMap.put(value, newValue);
}
@Override
public Map<Integer, Integer> getLocalValue() {
return this.hashMap;
return this.treeMap;
}
@Override
......@@ -60,29 +58,29 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
// Merge the values into this map
for (Map.Entry<Integer, Integer> entryFromOther : ((Histogram) other).getLocalValue()
.entrySet()) {
Integer ownValue = this.hashMap.get(entryFromOther.getKey());
Integer ownValue = this.treeMap.get(entryFromOther.getKey());
if (ownValue == null) {
this.hashMap.put(entryFromOther.getKey(), entryFromOther.getValue());
this.treeMap.put(entryFromOther.getKey(), entryFromOther.getValue());
} else {
this.hashMap.put(entryFromOther.getKey(), entryFromOther.getValue() + ownValue);
this.treeMap.put(entryFromOther.getKey(), entryFromOther.getValue() + ownValue);
}
}
}
@Override
public void resetLocal() {
this.hashMap.clear();
this.treeMap.clear();
}
@Override
public String toString() {
return this.hashMap.toString();
return this.treeMap.toString();
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(hashMap.size());
for (Map.Entry<Integer, Integer> entry : hashMap.entrySet()) {
out.writeInt(treeMap.size());
for (Map.Entry<Integer, Integer> entry : treeMap.entrySet()) {
out.writeInt(entry.getKey());
out.writeInt(entry.getValue());
}
......@@ -92,7 +90,7 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
public void read(DataInputView in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; ++i) {
hashMap.put(in.readInt(), in.readInt());
treeMap.put(in.readInt(), in.readInt());
}
}
......
......@@ -107,7 +107,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
// Test histogram (words per line distribution)
Map<Integer, Integer> dist = Maps.newHashMap();
dist.put(1, 1); dist.put(2, 2); dist.put(3, 3);
dist.put(1, 1); dist.put(2, 1); dist.put(3, 1);
Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
// Test distinct words (custom accumulator)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册