提交 9d4c814b 编写于 作者: A Alexey Milovidov

Aggregate function topK: style modifications [#CLICKHOUSE-2].

上级 5f1e65b2
......@@ -30,65 +30,69 @@ template <typename TKey, typename Hash = DefaultHash<TKey>>
class SpaceSaving
{
public:
struct Counter {
using Self = SpaceSaving<TKey, Hash>;
struct Counter
{
Counter() {}
Counter(const TKey & k, UInt64 c = 0, UInt64 e = 0)
: key(k), slot(0), count(c), error(e) {}
void write(DB::WriteBuffer & wb) const
void write(WriteBuffer & wb) const
{
DB::writeBinary(key, wb);
DB::writeVarUInt(count, wb);
DB::writeVarUInt(error, wb);
writeBinary(key, wb);
writeVarUInt(count, wb);
writeVarUInt(error, wb);
}
void read(DB::ReadBuffer & rb)
void read(ReadBuffer & rb)
{
DB::readBinary(key, rb);
DB::readVarUInt(count, rb);
DB::readVarUInt(error, rb);
readBinary(key, rb);
readVarUInt(count, rb);
readVarUInt(error, rb);
}
// greater() taking slot error into account
bool operator >(const Counter &b) const
bool operator> (const Counter & b) const
{
return (count > b.count) || (count == b.count && error < b.error);
}
TKey key;
size_t slot;
UInt64 count, error;
UInt64 count;
UInt64 error;
};
// Suggested constants in the paper "Finding top-k elements in data streams", chap 6. equation (24)
SpaceSaving(size_t c = 10) : counterMap(), counterList(), alphaMap(6 * c), cap(c) {}
SpaceSaving(size_t c = 10) : alpha_map(ALPHA_MAP_ELEMENTS_PER_COUNTER * c), m_capacity(c) {}
~SpaceSaving() { destroyElements(); }
inline size_t size() const
{
return counterList.size();
return counter_list.size();
}
inline size_t capacity() const
{
return cap;
return m_capacity;
}
void resize(size_t c)
void resize(size_t new_capacity)
{
counterList.reserve(c);
alphaMap.resize(c * 6);
cap = c;
counter_list.reserve(new_capacity);
alpha_map.resize(new_capacity * ALPHA_MAP_ELEMENTS_PER_COUNTER);
m_capacity = new_capacity;
}
Counter * insert(const TKey & key, UInt64 increment = 1, UInt64 error = 0)
{
// Increase weight of a key that already exists
// It uses hashtable for both value mapping as a presence test (c_i != 0)
auto hash = counterMap.hash(key);
auto it = counterMap.find(key, hash);
if (it != counterMap.end()) {
auto hash = counter_map.hash(key);
auto it = counter_map.find(key, hash);
if (it != counter_map.end())
{
auto c = it->second;
c->count += increment;
c->error += error;
......@@ -97,32 +101,36 @@ public:
}
// Key doesn't exist, but can fit in the top K
if (size() < capacity()) {
if (size() < capacity())
{
auto c = new Counter(key, increment, error);
push(c);
return c;
}
auto min = counterList.back();
auto & alpha = alphaMap[hash % alphaMap.size()];
if (alpha + increment < min->count) {
auto min = counter_list.back();
auto & alpha = alpha_map[hash % alpha_map.size()];
if (alpha + increment < min->count)
{
alpha += increment;
return nullptr;
}
// Erase the current minimum element
auto minHash = counterMap.hash(min->key);
it = counterMap.find(min->key, minHash);
if (it != counterMap.end()) {
auto minHash = counter_map.hash(min->key);
it = counter_map.find(min->key, minHash);
if (it != counter_map.end())
{
auto cell = it.getPtr();
cell->setZero();
}
// Replace minimum with newly inserted element
bool inserted = false;
counterMap.emplace(key, it, inserted, hash);
if (inserted) {
alphaMap[minHash % alphaMap.size()] = min->count;
counter_map.emplace(key, it, inserted, hash);
if (inserted)
{
alpha_map[minHash % alpha_map.size()] = min->count;
min->key = key;
min->count = alpha + increment;
min->error = alpha + error;
......@@ -137,14 +145,19 @@ public:
* Parallel Space Saving reduction and combine step from:
* https://arxiv.org/pdf/1401.0702.pdf
*/
void merge(const SpaceSaving<TKey, Hash> & rhs)
void merge(const Self & rhs)
{
UInt64 m1 = 0, m2 = 0;
if (size() == capacity()) {
m1 = counterList.back()->count;
UInt64 m1 = 0;
UInt64 m2 = 0;
if (size() == capacity())
{
m1 = counter_list.back()->count;
}
if (rhs.size() == rhs.capacity()) {
m2 = rhs.counterList.back()->count;
if (rhs.size() == rhs.capacity())
{
m2 = rhs.counter_list.back()->count;
}
/*
......@@ -153,21 +166,27 @@ public:
* in the first step we expect that no elements overlap
* and in the second sweep we correct the error if they do.
*/
if (m2 > 0) {
for (auto c : counterList) {
c->count += m2;
c->error += m2;
if (m2 > 0)
{
for (auto counter : counter_list)
{
counter->count += m2;
counter->error += m2;
}
}
// The list is sorted in descending order, we have to scan in reverse
for (auto c : boost::adaptors::reverse(rhs.counterList)) {
if (counterMap.find(c->key) != counterMap.end()) {
for (auto counter : boost::adaptors::reverse(rhs.counter_list))
{
if (counter_map.find(counter->key) != counter_map.end())
{
// Subtract m2 previously added, guaranteed not negative
insert(c->key, c->count - m2, c->error - m2);
} else {
insert(counter->key, counter->count - m2, counter->error - m2);
}
else
{
// Counters not monitored in S1
insert(c->key, c->count + m1, c->error + m1);
insert(counter->key, counter->count + m1, counter->error + m1);
}
}
}
......@@ -175,80 +194,88 @@ public:
std::vector<Counter> topK(size_t k) const
{
std::vector<Counter> res;
for (auto c : counterList) {
res.push_back(*c);
if (res.size() == k) {
for (auto counter : counter_list)
{
res.push_back(*counter);
if (res.size() == k)
break;
}
}
return res;
}
void write(DB::WriteBuffer & wb) const
void write(WriteBuffer & wb) const
{
DB::writeVarUInt(size(), wb);
for (auto c : counterList) {
c->write(wb);
}
for (auto a : alphaMap) {
DB::writeVarUInt(a, wb);
}
writeVarUInt(size(), wb);
for (auto counter : counter_list)
counter->write(wb);
for (auto alpha : alpha_map)
writeVarUInt(alpha, wb);
}
void read(DB::ReadBuffer & rb)
void read(ReadBuffer & rb)
{
destroyElements();
size_t count = 0;
DB::readVarUInt(count, rb);
readVarUInt(count, rb);
for (size_t i = 0; i < count; ++i) {
auto c = new Counter();
c->read(rb);
push(c);
for (size_t i = 0; i < count; ++i)
{
auto counter = new Counter();
counter->read(rb);
push(counter);
}
for (size_t i = 0; i < capacity() * 6; ++i) {
for (size_t i = 0; i < m_capacity * ALPHA_MAP_ELEMENTS_PER_COUNTER; ++i)
{
UInt64 alpha = 0;
DB::readVarUInt(alpha, rb);
alphaMap.push_back(alpha);
readVarUInt(alpha, rb);
alpha_map.push_back(alpha);
}
}
protected:
void push(Counter * c) {
c->slot = counterList.size();
counterList.push_back(c);
counterMap[c->key] = c;
percolate(c);
void push(Counter * counter)
{
counter->slot = counter_list.size();
counter_list.push_back(counter);
counter_map[counter->key] = counter;
percolate(counter);
}
// This is equivallent to one step of bubble sort
void percolate(Counter * c) {
while (c->slot > 0) {
auto next = counterList[c->slot - 1];
if (*c > *next) {
std::swap(next->slot, c->slot);
std::swap(counterList[next->slot], counterList[c->slot]);
} else {
break;
void percolate(Counter * counter)
{
while (counter->slot > 0)
{
auto next = counter_list[counter->slot - 1];
if (*counter > *next)
{
std::swap(next->slot, counter->slot);
std::swap(counter_list[next->slot], counter_list[counter->slot]);
}
else
break;
}
}
private:
void destroyElements() {
for (auto c : counterList) {
delete c;
}
counterMap.clear();
counterList.clear();
alphaMap.clear();
void destroyElements()
{
for (auto counter : counter_list)
delete counter;
counter_map.clear();
counter_list.clear();
alpha_map.clear();
}
HashMap<TKey, Counter *, Hash> counterMap;
std::vector<Counter *> counterList;
std::vector<UInt64> alphaMap;
size_t cap;
HashMap<TKey, Counter *, Hash> counter_map;
std::vector<Counter *> counter_list;
std::vector<UInt64> alpha_map;
size_t m_capacity;
// Suggested constants in the paper "Finding top-k elements in data streams", chap 6. equation (24)
enum { ALPHA_MAP_ELEMENTS_PER_COUNTER = 6 };
};
};
\ No newline at end of file
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册