提交 cbe3b08b 编写于 作者: O Omer Arap

Move hyperloglog from contrib to catalog

This commit moves hyperloglog aggregates and functions to catalog as
well as migrates any hyperloglog related functions into
`src/backend/utils`.
上级 211fa919
......@@ -27,7 +27,6 @@ all:
$(MAKE) -C contrib/pg_upgrade_support all
$(MAKE) -C contrib/pg_upgrade all
$(MAKE) -C contrib/hstore all
$(MAKE) -C contrib/postgres_hyperloglog all
$(MAKE) -C contrib/pgcrypto all
$(MAKE) -C gpAux/extensions all
$(MAKE) -C gpAux/gpperfmon all
......@@ -61,7 +60,6 @@ install:
$(MAKE) -C contrib/pg_upgrade_support $@
$(MAKE) -C contrib/pg_upgrade $@
$(MAKE) -C contrib/hstore $@
$(MAKE) -C contrib/postgres_hyperloglog $@
$(MAKE) -C contrib/pgcrypto $@
$(MAKE) -C gpMgmt $@
$(MAKE) -C gpAux/extensions $@
......
Copyright 2015, Conversant, Inc. All rights reserved.
============================================================================================
For: HyperLogLog
--------------------------------------------------------------------------------------------
Copyright 2012, Tomas Vondra (tv@fuzzy.cz). All rights reserved.
Copyright 2015, Conversant, Inc. All rights reserved.
Copyright 2018, Pivotal Software, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are
permitted provided that the following conditions are met:
......@@ -11,16 +15,17 @@ permitted provided that the following conditions are met:
of conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.
THIS SOFTWARE IS PROVIDED BY CONVERSANT INC ''AS IS'' AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL TOMAS VONDRA OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
THIS SOFTWARE IS PROVIDED BY TOMAS VONDRA, CONVERSANT INC, PIVOTAL SOFTWARE INC. AND ANY
OTHER CONTRIBUTORS (THE "AUTHORS") ''AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The views and conclusions contained in the software and documentation are those of the
authors and should not be interpreted as representing official policies, either expressed
or implied, of Conversant Inc.
Authors and should not be interpreted as representing official policies, either expressed
or implied, of the Authors.
============================================================================================
{
"name": "hyperloglog_estimator",
"abstract": "Estimates number of distinct elements in a data set (aggregate and a data type).",
"description": "Provides an alternative to COUNT(DISTINCT) aggregate, computing an estimate of number of distinct values, and a data type that may be used within a table (and updated continuously). This implementation is based on HyperLogLog algorithm, an enhancement of LogLog (see the paper 'HyperLogLog: the analysis of near-optimal cardinality estimation algorithm' by Flajolet, Fusy, Gandouet and Meunier, published in 2007).",
"version": "2.0.0",
"maintainer": "Tomas Vondra <tv@fuzzy.cz>",
"license": "bsd",
"prereqs": {
"runtime": {
"requires": {
"PostgreSQL": "8.2"
}
}
},
"provides": {
"hyperloglog_counter": {
"file": "sql/postgres.sql",
"docfile" : "README.md",
"version": "2.0.0"
}
},
"resources": {
"repository": {
"url": "https://tvondra@github.com/tvondra/distinct_estimators.git",
"web": "http://github.com/tvondra/distinct_estimators",
"type": "git"
},
},
"tags" : ["distinct", "count", "aggregate", "estimate", "hyperloglog"],
"meta-spec": {
"version": "1.0.0",
"url": "http://pgxn.org/meta/spec.txt"
},
"release_status" : "testing"
}
MODULE_big = hyperloglog_counter
OBJS = src/hyperloglog_counter.o
EXTENSION = hyperloglog_counter
DATA = sql/hyperloglog_counter--2.0.0.sql
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = contrib/postgres_hyperloglog
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
all: hyperloglog_counter.so
hyperloglog_counter.so: $(OBJS)
%.o : src/%.c
# HyperLogLog estimator control
comment = 'Aggregation functions and data type for distinct estimation based on HyperLogLog.'
default_version = '2.0.0'
relocatable = true
module_pathname = '$libdir/hyperloglog_counter'
-- HyperLogLog
CREATE SCHEMA gp_hyperloglog;
SET search_path = gp_hyperloglog, public, pg_catalog;
-- HyperLogLog counter (shell type)
CREATE TYPE hyperloglog_estimator;
/* input/output functions */
CREATE FUNCTION hyperloglog_in(value cstring) RETURNS hyperloglog_estimator
AS '$libdir/hyperloglog_counter', 'hyperloglog_in'
LANGUAGE C STRICT IMMUTABLE;
CREATE FUNCTION hyperloglog_out(counter hyperloglog_estimator) RETURNS cstring
AS '$libdir/hyperloglog_counter', 'hyperloglog_out'
LANGUAGE C STRICT IMMUTABLE;
-- actual LogLog counter data type
CREATE TYPE hyperloglog_estimator (
INPUT = hyperloglog_in,
OUTPUT = hyperloglog_out,
STORAGE = EXTENDED,
ALIGNMENT = int4,
INTERNALLENGTH = VARIABLE
);
-- allow cast from bytea to hyperloglog_estimator
CREATE CAST (bytea as hyperloglog_estimator) WITHOUT FUNCTION AS ASSIGNMENT;
-- allow cast from bytea to hyperloglog_estimator
CREATE CAST (hyperloglog_estimator as bytea) WITHOUT FUNCTION AS ASSIGNMENT;
CREATE OR REPLACE FUNCTION hyperloglog_to_text(hyperloglog_estimator) RETURNS text
AS $$ select pg_catalog.encode($1::bytea, 'base64'); $$ LANGUAGE SQL IMMUTABLE;
CREATE OR REPLACE FUNCTION hyperloglog_from_text(text) RETURNS hyperloglog_estimator
AS $$ select pg_catalog.decode($1, 'base64')::hyperloglog_estimator; $$ LANGUAGE SQL IMMUTABLE;
CREATE CAST (hyperloglog_estimator as text) WITH FUNCTION hyperloglog_to_text(hyperloglog_estimator);
CREATE CAST (text as hyperloglog_estimator) WITH FUNCTION hyperloglog_from_text(text);
/* compress/decompress inner data funcitons */
CREATE OR REPLACE FUNCTION hyperloglog_comp(counter hyperloglog_estimator) RETURNS hyperloglog_estimator
AS '$libdir/hyperloglog_counter', 'hyperloglog_comp'
LANGUAGE C IMMUTABLE STRICT;
COMMENT ON FUNCTION hyperloglog_comp(counter hyperloglog_estimator) IS 'Uses an internal quicklz/group varint encoding algorithm to compress each hyperloglog_estimator individually';
/* Utility functions */
-- creates a new HyperLogLog estimator with default error_rate 0.8125% and default 2^64 ndistinct
CREATE OR REPLACE FUNCTION hyperloglog_init() RETURNS hyperloglog_estimator
AS '$libdir/hyperloglog_counter', 'hyperloglog_init_default'
LANGUAGE C IMMUTABLE;
-- merges the second estimator into a copy of the first one
CREATE OR REPLACE FUNCTION hyperloglog_merge(estimator1 hyperloglog_estimator, estimator2 hyperloglog_estimator) RETURNS hyperloglog_estimator
AS '$libdir/hyperloglog_counter', 'hyperloglog_merge'
LANGUAGE C IMMUTABLE;
COMMENT ON FUNCTION hyperloglog_merge(estimator1 hyperloglog_estimator, estimator2 hyperloglog_estimator) IS 'Merge two seperate hyperloglog_estimators into one. This varies from the aggregate hyperloglog_merge since it merges columns together instead of rows.';
-- merges (inplace) the second estimator into the first one
CREATE OR REPLACE FUNCTION hyperloglog_merge_agg(estimator1 hyperloglog_estimator, estimator2 hyperloglog_estimator) RETURNS hyperloglog_estimator
AS '$libdir/hyperloglog_counter', 'hyperloglog_merge'
LANGUAGE C IMMUTABLE;
-- get current estimate of the distinct values (as a real number)
CREATE OR REPLACE FUNCTION hyperloglog_get_estimate(counter hyperloglog_estimator) RETURNS double precision
AS '$libdir/hyperloglog_counter', 'hyperloglog_get_estimate'
LANGUAGE C STRICT IMMUTABLE;
COMMENT ON FUNCTION hyperloglog_get_estimate(counter hyperloglog_estimator) IS 'Estimates the cardinality of the provided hyperloglog_estimator';
-- length of the estimator (about the same as hyperloglog_size with existing estimator)
CREATE OR REPLACE FUNCTION length(counter hyperloglog_estimator) RETURNS int
AS '$libdir/hyperloglog_counter', 'hyperloglog_length'
LANGUAGE C STRICT IMMUTABLE;
COMMENT ON FUNCTION length(counter hyperloglog_estimator) IS 'Returns the length of the provided hyperloglog_estimator which is equivalent to its size in bytes';
/* functions for aggregate functions */
CREATE OR REPLACE FUNCTION hyperloglog_add_item_agg_default(counter hyperloglog_estimator, item anyelement) RETURNS hyperloglog_estimator
AS '$libdir/hyperloglog_counter', 'hyperloglog_add_item_agg_default'
LANGUAGE C IMMUTABLE;
CREATE OR REPLACE FUNCTION hyperloglog_get_estimate_bigint(hyperloglog_estimator) RETURNS bigint
AS $$ select coalesce(round(hyperloglog_get_estimate($1))::bigint, 0) $$
LANGUAGE SQL IMMUTABLE;
COMMENT ON FUNCTION hyperloglog_get_estimate_bigint(hyperloglog_estimator) IS 'Gets the estimated cardinality of the hyperloglog_estimator and rounds to the nearest whole number (bigint)';
-- HyperLogLog based count distinct (item)
DROP AGGREGATE IF EXISTS hyperloglog_distinct(anyelement);
CREATE AGGREGATE hyperloglog_distinct(anyelement)
(
sfunc = hyperloglog_add_item_agg_default,
stype = hyperloglog_estimator,
prefunc = hyperloglog_merge_agg,
finalfunc = hyperloglog_get_estimate
);
COMMENT ON AGGREGATE hyperloglog_distinct(anyelement) IS 'Uses the hyperloglog_estimator to estimate the distinct count of the column with default capacity and accuracy values (0.8125% accuracy, 1.84*10^19 capacity)';
-- build the counter(s) from elements, but does not perform the final estimation
DROP AGGREGATE IF EXISTS hyperloglog_accum(anyelement);
CREATE AGGREGATE hyperloglog_accum(anyelement)
(
sfunc = hyperloglog_add_item_agg_default,
prefunc = hyperloglog_merge_agg,
stype = hyperloglog_estimator,
finalfunc = hyperloglog_comp
);
COMMENT ON AGGREGATE hyperloglog_accum(anyelement) IS 'Accumulates anyelement into a hyperloglog_estimator of default size (14 index bits, 6 bits/bucket)';
-- mirror real sum function
DROP AGGREGATE IF EXISTS sum(hyperloglog_estimator);
CREATE AGGREGATE sum(hyperloglog_estimator)
(
sfunc = hyperloglog_merge_agg,
stype = hyperloglog_estimator,
prefunc = hyperloglog_merge_agg,
finalfunc = hyperloglog_get_estimate_bigint
);
COMMENT ON AGGREGATE sum(hyperloglog_estimator) IS 'First merges all the hyperloglog_estimators, then evaluates the resulting counter, and then rounds the result to the nearest whole number (bigint)';
-- merges all the counters into just a single one (e.g. after running hyperloglog_accum)
DROP AGGREGATE IF EXISTS hyperloglog_merge(hyperloglog_estimator);
CREATE AGGREGATE hyperloglog_merge(hyperloglog_estimator)
(
sfunc = hyperloglog_merge_agg,
prefunc = hyperloglog_merge_agg,
stype = hyperloglog_estimator,
finalfunc = hyperloglog_comp
);
COMMENT ON AGGREGATE hyperloglog_merge(hyperloglog_estimator) IS 'Merges/unions hyperloglog_estimators into a unified estimator';
GRANT USAGE ON SCHEMA gp_hyperloglog TO public;
/* File contains functions that interact directly with the postgres api */
#include <stdio.h>
#include <math.h>
#include <string.h>
#include <sys/time.h>
#include "postgres.h"
#include "fmgr.h"
#include "utils/builtins.h"
#include "utils/bytea.h"
#include "utils/lsyscache.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "utils/hyperloglog/hyperloglog.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
/* PG_GETARG macros for HLLCounter's that does version checking */
#define PG_GETARG_HLL_P(n) pg_check_hll_version((HLLCounter) PG_GETARG_BYTEA_P(n))
#define PG_GETARG_HLL_P_COPY(n) pg_check_hll_version((HLLCounter) PG_GETARG_BYTEA_P_COPY(n))
/* shoot for 2^64 distinct items and 0.8125% error rate by default */
#define DEFAULT_NDISTINCT 1ULL << 63
#define DEFAULT_ERROR 0.008125
/* Use the PG_FUNCTION_INFO_V! macro to pass functions to postgres */
PG_FUNCTION_INFO_V1(hyperloglog_add_item_agg_default);
PG_FUNCTION_INFO_V1(hyperloglog_merge);
PG_FUNCTION_INFO_V1(hyperloglog_get_estimate);
PG_FUNCTION_INFO_V1(hyperloglog_init_default);
PG_FUNCTION_INFO_V1(hyperloglog_length);
PG_FUNCTION_INFO_V1(hyperloglog_in);
PG_FUNCTION_INFO_V1(hyperloglog_out);
PG_FUNCTION_INFO_V1(hyperloglog_comp);
/* ------------- function declarations for local functions --------------- */
Datum hyperloglog_add_item_agg_default(PG_FUNCTION_ARGS);
Datum hyperloglog_get_estimate(PG_FUNCTION_ARGS);
Datum hyperloglog_merge(PG_FUNCTION_ARGS);
Datum hyperloglog_init_default(PG_FUNCTION_ARGS);
Datum hyperloglog_length(PG_FUNCTION_ARGS);
Datum hyperloglog_in(PG_FUNCTION_ARGS);
Datum hyperloglog_out(PG_FUNCTION_ARGS);
Datum hyperloglog_comp(PG_FUNCTION_ARGS);
static HLLCounter pg_check_hll_version(HLLCounter hloglog);
/* ---------------------- function definitions --------------------------- */
static HLLCounter
pg_check_hll_version(HLLCounter hloglog)
{
if (hloglog->version != STRUCT_VERSION){
elog(ERROR,"ERROR: The stored counter is version %u while the library is version %u. Please change library version or use upgrade function to upgrade the counter",hloglog->version,STRUCT_VERSION);
}
return hloglog;
}
Datum
hyperloglog_add_item_agg_default(PG_FUNCTION_ARGS)
{
HLLCounter hyperloglog;
/* info for anyelement */
Oid element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
Datum element = PG_GETARG_DATUM(1);
int16 typlen;
bool typbyval;
char typalign;
/* Create a new estimator (with default error rate and ndistinct) or reuse
* the existing one. Return null if both counter and element args are null.
* This prevents excess empty counter creation */
if (PG_ARGISNULL(0) && PG_ARGISNULL(1)){
PG_RETURN_NULL();
} else if (PG_ARGISNULL(0)) {
hyperloglog = hll_create(DEFAULT_NDISTINCT, DEFAULT_ERROR, PACKED);
} else {
hyperloglog = PG_GETARG_HLL_P(0);
}
/* add the item to the estimator (skip NULLs) */
if (! PG_ARGISNULL(1)) {
/* TODO The requests for type info shouldn't be a problem (thanks to
* lsyscache), but if it turns out to have a noticeable impact it's
* possible to cache that between the calls (in the estimator).
*
* I have noticed no measurable effect from either option. */
/* get type information for the second parameter (anyelement item) */
get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
hyperloglog = hyperloglog_add_item(hyperloglog, element, typlen, typbyval, typalign);
}
/* return the updated bytea */
PG_RETURN_BYTEA_P(hyperloglog);
}
Datum
hyperloglog_merge(PG_FUNCTION_ARGS)
{
HLLCounter counter1;
HLLCounter counter2;
if (PG_ARGISNULL(0) && PG_ARGISNULL(1)){
/* if both counters are null return null */
PG_RETURN_NULL();
} else if (PG_ARGISNULL(0)) {
/* if first counter is null just copy the second estimator into the
* first one */
counter1 = PG_GETARG_HLL_P(1);
} else if (PG_ARGISNULL(1)) {
/* if second counter is null just return the the first estimator */
counter1 = PG_GETARG_HLL_P(0);
} else {
/* ok, we already have the estimator - merge the second one into it */
counter1 = PG_GETARG_HLL_P_COPY(0);
counter2 = PG_GETARG_HLL_P_COPY(1);
counter1 = hyperloglog_merge_counters(counter1, counter2);
}
/* return the updated bytea */
PG_RETURN_BYTEA_P(counter1);
}
Datum
hyperloglog_get_estimate(PG_FUNCTION_ARGS)
{
double estimate;
HLLCounter hyperloglog = PG_GETARG_HLL_P_COPY(0);
estimate = hyperloglog_estimate(hyperloglog);
/* return the updated bytea */
PG_RETURN_FLOAT8(estimate);
}
Datum
hyperloglog_init_default(PG_FUNCTION_ARGS)
{
HLLCounter hyperloglog;
hyperloglog = hyperloglog_init_def();
PG_RETURN_BYTEA_P(hyperloglog);
}
Datum
hyperloglog_length(PG_FUNCTION_ARGS)
{
PG_RETURN_INT32(hyperloglog_len(PG_GETARG_HLL_P(0)));
}
Datum
hyperloglog_out(PG_FUNCTION_ARGS)
{
int16 datalen, resultlen, res;
char *result;
bytea *data = PG_GETARG_BYTEA_P(0);
datalen = VARSIZE_ANY_EXHDR(data);
resultlen = b64_enc_len(VARDATA_ANY(data), datalen);
result = palloc(resultlen + 1);
res = hll_b64_encode(VARDATA_ANY(data),datalen, result);
/* Make this FATAL 'cause we've trodden on memory ... */
if (res > resultlen)
elog(FATAL, "overflow - encode estimate too small");
result[res] = '\0';
PG_RETURN_CSTRING(result);
}
Datum
hyperloglog_in(PG_FUNCTION_ARGS)
{
bytea *result;
char *data = PG_GETARG_CSTRING(0);
int16 datalen, resultlen, res;
datalen = strlen(data);
resultlen = b64_dec_len(data,datalen);
result = palloc(VARHDRSZ + resultlen);
res = hll_b64_decode(data, datalen, VARDATA(result));
/* Make this FATAL 'cause we've trodden on memory ... */
if (res > resultlen)
elog(FATAL, "overflow - decode estimate too small");
SET_VARSIZE(result, VARHDRSZ + res);
PG_RETURN_BYTEA_P(result);
}
Datum
hyperloglog_comp(PG_FUNCTION_ARGS)
{
HLLCounter hyperloglog;
if (PG_ARGISNULL(0) ){
PG_RETURN_NULL();
}
hyperloglog = PG_GETARG_HLL_P_COPY(0);
hyperloglog = hll_compress(hyperloglog);
PG_RETURN_BYTEA_P(hyperloglog);
}
......@@ -109,7 +109,6 @@ install-data: $(BKIFILES) installdirs
$(INSTALL_DATA) $(call vpathsearch,cdb_schema.sql) '$(DESTDIR)$(datadir)/cdb_init.d/cdb_schema.sql'
$(INSTALL_DATA) $(srcdir)/sql_features.txt '$(DESTDIR)$(datadir)/sql_features.txt'
$(INSTALL_DATA) $(call vpathsearch,gp_toolkit.sql) '$(DESTDIR)$(datadir)/cdb_init.d/gp_toolkit.sql'
$(INSTALL_DATA) $(call vpathsearch,hyperloglog_counter.sql) '$(DESTDIR)$(datadir)/cdb_init.d/hyperloglog_counter.sql'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(datadir)'
......
......@@ -1716,7 +1716,7 @@ acquire_hll_by_query(Relation onerel, int nattrs, VacAttrStats **attrstats)
for (i = 0; i < nattrs; i++)
{
const char *attname = quote_identifier(NameStr(attrstats[i]->attr->attname));
appendStringInfo(&columnStr, "gp_hyperloglog.hyperloglog_accum(%s)", attname);
appendStringInfo(&columnStr, "pg_catalog.hyperloglog_accum(%s)", attname);
if(i != nattrs-1)
appendStringInfo(&columnStr, ", ");
}
......
/*
* Copyright 2012, Tomas Vondra (tv@fuzzy.cz). All rights reserved.
* Copyright 2015, Conversant, Inc. All rights reserved.
* Copyright 2018, Pivotal Software, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are
* permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this list of
* conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice, this list
* of conditions and the following disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY TOMAS VONDRA, CONVERSANT INC, PIVOTAL SOFTWARE INC. AND ANY
* OTHER CONTRIBUTORS (THE "AUTHORS") ''AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are those of the
* Authors and should not be interpreted as representing official policies, either expressed
* or implied, of the Authors.
*/
/*
* 07/26/2018
*
* We have updated, removed and modified the content of this file.
*
* 1. We have removed some funciton definitions in as we did not need
* them for implementing incremental analyze.
* 2. We modified utility function definitions as most of them are used in the
* code internally and not exposed to the user.Function parameters are
* no longer as the type of `PG_FUNCTION_ARGS` and extracted in the
* function body, but only passed by the caller as it is.
* 3. We kept the definitions of user facing functions that are necessary for
* full scan incremental analyze.
* 4. We abondoned the sparse represenation of the hyperloglog and for
* simplicity this version only supports dense represenation.
*/
/* This file contains internal functions and several functions exposed to the
* outside via hyperloglog.h. The functions are for the manipulation/creation/
* evaluation of HLLCounters.
* evaluation of HLLCounters that are necessary for implementing incremental
* analyze in GPDB. This file is modified from its original content and we removed
* the code that was unnecessary for our purpose.
*/
#include <stdio.h>
#include <stdlib.h>
......@@ -9,7 +59,13 @@
#include <string.h>
#include "postgres.h"
#include "fmgr.h"
#include "utils/pg_lzcompress.h"
#include "utils/builtins.h"
#include "utils/bytea.h"
#include "utils/lsyscache.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "utils/hyperloglog/hyperloglog.h"
......@@ -993,3 +1049,192 @@ b64_dec_len(const char *src, unsigned srclen)
return (srclen * 3) >> 2;
}
/* PG_GETARG macros for HLLCounter's that does version checking */
#define PG_GETARG_HLL_P(n) pg_check_hll_version((HLLCounter) PG_GETARG_BYTEA_P(n))
#define PG_GETARG_HLL_P_COPY(n) pg_check_hll_version((HLLCounter) PG_GETARG_BYTEA_P_COPY(n))
/* shoot for 2^64 distinct items and 0.8125% error rate by default */
#define DEFAULT_NDISTINCT 1ULL << 63
#define DEFAULT_ERROR 0.008125
/* Use the PG_FUNCTION_INFO_V! macro to pass functions to postgres */
PG_FUNCTION_INFO_V1(hyperloglog_add_item_agg_default);
PG_FUNCTION_INFO_V1(hyperloglog_merge);
PG_FUNCTION_INFO_V1(hyperloglog_get_estimate);
PG_FUNCTION_INFO_V1(hyperloglog_in);
PG_FUNCTION_INFO_V1(hyperloglog_out);
PG_FUNCTION_INFO_V1(hyperloglog_comp);
/* ------------- function declarations for local functions --------------- */
extern Datum hyperloglog_add_item_agg_default(PG_FUNCTION_ARGS);
extern Datum hyperloglog_get_estimate(PG_FUNCTION_ARGS);
extern Datum hyperloglog_merge(PG_FUNCTION_ARGS);
extern Datum hyperloglog_in(PG_FUNCTION_ARGS);
extern Datum hyperloglog_out(PG_FUNCTION_ARGS);
extern Datum hyperloglog_comp(PG_FUNCTION_ARGS);
static HLLCounter pg_check_hll_version(HLLCounter hloglog);
/* ---------------------- function definitions --------------------------- */
static HLLCounter
pg_check_hll_version(HLLCounter hloglog)
{
if (hloglog->version != STRUCT_VERSION){
elog(ERROR,"ERROR: The stored counter is version %u while the library is version %u. Please change library version or use upgrade function to upgrade the counter",hloglog->version,STRUCT_VERSION);
}
return hloglog;
}
Datum
hyperloglog_add_item_agg_default(PG_FUNCTION_ARGS)
{
HLLCounter hyperloglog;
/* info for anyelement */
Oid element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
Datum element = PG_GETARG_DATUM(1);
int16 typlen;
bool typbyval;
char typalign;
/* Create a new estimator (with default error rate and ndistinct) or reuse
* the existing one. Return null if both counter and element args are null.
* This prevents excess empty counter creation */
if (PG_ARGISNULL(0) && PG_ARGISNULL(1)){
PG_RETURN_NULL();
} else if (PG_ARGISNULL(0)) {
hyperloglog = hll_create(DEFAULT_NDISTINCT, DEFAULT_ERROR, PACKED);
} else {
hyperloglog = PG_GETARG_HLL_P(0);
}
/* add the item to the estimator (skip NULLs) */
if (! PG_ARGISNULL(1)) {
/* TODO The requests for type info shouldn't be a problem (thanks to
* lsyscache), but if it turns out to have a noticeable impact it's
* possible to cache that between the calls (in the estimator).
*
* I have noticed no measurable effect from either option. */
/* get type information for the second parameter (anyelement item) */
get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
hyperloglog = hyperloglog_add_item(hyperloglog, element, typlen, typbyval, typalign);
}
/* return the updated bytea */
PG_RETURN_BYTEA_P(hyperloglog);
}
Datum
hyperloglog_merge(PG_FUNCTION_ARGS)
{
HLLCounter counter1;
HLLCounter counter2;
if (PG_ARGISNULL(0) && PG_ARGISNULL(1)){
/* if both counters are null return null */
PG_RETURN_NULL();
} else if (PG_ARGISNULL(0)) {
/* if first counter is null just copy the second estimator into the
* first one */
counter1 = PG_GETARG_HLL_P(1);
} else if (PG_ARGISNULL(1)) {
/* if second counter is null just return the the first estimator */
counter1 = PG_GETARG_HLL_P(0);
} else {
/* ok, we already have the estimator - merge the second one into it */
counter1 = PG_GETARG_HLL_P_COPY(0);
counter2 = PG_GETARG_HLL_P_COPY(1);
counter1 = hyperloglog_merge_counters(counter1, counter2);
}
/* return the updated bytea */
PG_RETURN_BYTEA_P(counter1);
}
Datum
hyperloglog_get_estimate(PG_FUNCTION_ARGS)
{
double estimate;
HLLCounter hyperloglog = PG_GETARG_HLL_P_COPY(0);
estimate = hyperloglog_estimate(hyperloglog);
/* return the updated bytea */
PG_RETURN_FLOAT8(estimate);
}
Datum
hyperloglog_out(PG_FUNCTION_ARGS)
{
int16 datalen, resultlen, res;
char *result;
bytea *data = PG_GETARG_BYTEA_P(0);
datalen = VARSIZE_ANY_EXHDR(data);
resultlen = b64_enc_len(VARDATA_ANY(data), datalen);
result = palloc(resultlen + 1);
res = hll_b64_encode(VARDATA_ANY(data),datalen, result);
/* Make this FATAL 'cause we've trodden on memory ... */
if (res > resultlen)
elog(FATAL, "overflow - encode estimate too small");
result[res] = '\0';
PG_RETURN_CSTRING(result);
}
Datum
hyperloglog_in(PG_FUNCTION_ARGS)
{
bytea *result;
char *data = PG_GETARG_CSTRING(0);
int16 datalen, resultlen, res;
datalen = strlen(data);
resultlen = b64_dec_len(data,datalen);
result = palloc(VARHDRSZ + resultlen);
res = hll_b64_decode(data, datalen, VARDATA(result));
/* Make this FATAL 'cause we've trodden on memory ... */
if (res > resultlen)
elog(FATAL, "overflow - decode estimate too small");
SET_VARSIZE(result, VARHDRSZ + res);
PG_RETURN_BYTEA_P(result);
}
Datum
hyperloglog_comp(PG_FUNCTION_ARGS)
{
HLLCounter hyperloglog;
if (PG_ARGISNULL(0) ){
PG_RETURN_NULL();
}
hyperloglog = PG_GETARG_HLL_P_COPY(0);
hyperloglog = hll_compress(hyperloglog);
PG_RETURN_BYTEA_P(hyperloglog);
}
......@@ -315,6 +315,9 @@ DATA(insert ( 6130 o 1 ordered_set_transition - - - percentile_cont_timestamptz
/* text */
DATA(insert ( 3538 n 0 string_agg_transfn - - - string_agg_finalfn f 0 2281 _null_));
/* hyperloglog */
DATA(insert ( 7164 n 0 hyperloglog_add_item_agg_default - hyperloglog_merge - hyperloglog_comp f 0 7157 _null_));
/*
* prototypes for functions in pg_aggregate.c
*/
......
......@@ -531,3 +531,17 @@
CREATE FUNCTION complex_lte(complex, complex) RETURNS bool LANGUAGE internal IMMUTABLE STRICT AS 'complex_lte' WITH (OID=3595);
CREATE FUNCTION complex_gte(complex, complex) RETURNS bool LANGUAGE internal IMMUTABLE STRICT AS 'complex_gte' WITH (OID=7596);
CREATE FUNCTION hyperloglog_in(value cstring) RETURNS hyperloglog_estimator LANGUAGE internal IMMUTABLE STRICT AS 'hyperloglog_in' WITH (OID=7158, DESCRIPTION="Decode a bytea into hyperloglog_counter");
CREATE FUNCTION hyperloglog_out(counter hyperloglog_estimator) RETURNS cstring LANGUAGE internal IMMUTABLE STRICT AS 'hyperloglog_out' WITH (OID=7159, DESCRIPTION="Encode an hyperloglog_counter into a bytea");
CREATE FUNCTION hyperloglog_comp(counter hyperloglog_estimator) RETURNS hyperloglog_estimator LANGUAGE internal IMMUTABLE STRICT AS 'hyperloglog_comp' WITH (OID=7160, DESCRIPTION="Compress an hyperloglog counter");
CREATE FUNCTION hyperloglog_merge(estimator1 hyperloglog_estimator, estimator2 hyperloglog_estimator) RETURNS hyperloglog_estimator LANGUAGE internal IMMUTABLE AS 'hyperloglog_merge' WITH (OID=7161, DESCRIPTION="Merge two hyperloglog counters into one");
CREATE FUNCTION hyperloglog_get_estimate(counter hyperloglog_estimator) RETURNS float8 LANGUAGE internal IMMUTABLE STRICT AS 'hyperloglog_get_estimate' WITH (OID=7162, DESCRIPTION="Estimates the number of distinct values stored in an hyperloglog counter");
CREATE FUNCTION hyperloglog_add_item_agg_default(counter hyperloglog_estimator, item anyelement) RETURNS hyperloglog_estimator LANGUAGE internal IMMUTABLE AS 'hyperloglog_add_item_agg_default' WITH (OID=7163, DESCRIPTION="Includes a data value into a hyperloglog counter");
CREATE FUNCTION hyperloglog_accum(anyelement) RETURNS hyperloglog_estimator LANGUAGE internal IMMUTABLE AS 'aggregate_dummy' WITH (OID=7164, proisagg="t", DESCRIPTION="Adds every data value to a hyperloglog counter and returns the counter");
......@@ -22,7 +22,7 @@
WARNING: DO NOT MODIFY THE FOLLOWING SECTION:
Generated by catullus.pl version 8
on Mon Jun 25 18:03:46 2018
on Mon Jul 9 11:55:48 2018
Please make your changes in pg_proc.sql
*/
......@@ -1064,5 +1064,33 @@ DATA(insert OID = 3595 ( complex_lte PGNSP PGUID 12 1 0 0 f f f t f i 2 0 16 "7
/* complex_gte(complex, complex) => bool */
DATA(insert OID = 7596 ( complex_gte PGNSP PGUID 12 1 0 0 f f f t f i 2 0 16 "7198 7198" _null_ _null_ _null_ _null_ complex_gte _null_ _null_ _null_ n a ));
/* hyperloglog_in(value cstring) => hyperloglog_estimator */
DATA(insert OID = 7158 ( hyperloglog_in PGNSP PGUID 12 1 0 0 f f f t f i 1 0 7157 "2275" _null_ _null_ "{value}" _null_ hyperloglog_in _null_ _null_ _null_ n a ));
DESCR("Decode a bytea into hyperloglog_counter");
/* hyperloglog_out(counter hyperloglog_estimator) => cstring */
DATA(insert OID = 7159 ( hyperloglog_out PGNSP PGUID 12 1 0 0 f f f t f i 1 0 2275 "7157" _null_ _null_ "{counter}" _null_ hyperloglog_out _null_ _null_ _null_ n a ));
DESCR("Encode an hyperloglog_counter into a bytea");
/* hyperloglog_comp(counter hyperloglog_estimator) => hyperloglog_estimator */
DATA(insert OID = 7160 ( hyperloglog_comp PGNSP PGUID 12 1 0 0 f f f t f i 1 0 7157 "7157" _null_ _null_ "{counter}" _null_ hyperloglog_comp _null_ _null_ _null_ n a ));
DESCR("Compress an hyperloglog counter");
/* hyperloglog_merge(estimator1 hyperloglog_estimator, estimator2 hyperloglog_estimator) => hyperloglog_estimator */
DATA(insert OID = 7161 ( hyperloglog_merge PGNSP PGUID 12 1 0 0 f f f f f i 2 0 7157 "7157 7157" _null_ _null_ "{estimator1,estimator2}" _null_ hyperloglog_merge _null_ _null_ _null_ n a ));
DESCR("Merge two hyperloglog counters into one");
/* hyperloglog_get_estimate(counter hyperloglog_estimator) => float8 */
DATA(insert OID = 7162 ( hyperloglog_get_estimate PGNSP PGUID 12 1 0 0 f f f t f i 1 0 701 "7157" _null_ _null_ "{counter}" _null_ hyperloglog_get_estimate _null_ _null_ _null_ n a ));
DESCR("Estimates the number of distinct values stored in an hyperloglog counter");
/* hyperloglog_add_item_agg_default(counter hyperloglog_estimator, item anyelement) => hyperloglog_estimator */
DATA(insert OID = 7163 ( hyperloglog_add_item_agg_default PGNSP PGUID 12 1 0 0 f f f f f i 2 0 7157 "7157 2283" _null_ _null_ "{counter,item}" _null_ hyperloglog_add_item_agg_default _null_ _null_ _null_ n a ));
DESCR("Includes a data value into a hyperloglog counter");
/* hyperloglog_accum(anyelement) => hyperloglog_estimator */
DATA(insert OID = 7164 ( hyperloglog_accum PGNSP PGUID 12 1 0 0 t f f f f i 1 0 7157 "2283" _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ n a ));
DESCR("Adds every data value to a hyperloglog counter and returns the counter");
/* TIDYCAT_END_PG_PROC_GEN */
......@@ -672,6 +672,10 @@ DATA(insert OID = 7053 ( anytable PGNSP PGUID -1 f p P f t \054 0 0 0 anytable_
DESCR("Represents a generic TABLE value expression");
#define ANYTABLEOID 7053
/* hyperloglog */
DATA(insert OID = 7157 ( hyperloglog_estimator PGNSP PGUID -1 f b X f t \054 0 0 7165 hyperloglog_in hyperloglog_out - - - - - i x f 0 -1 0 0 _null_ _null_ ));
DESCR("hyperloglog_estimator’s internal bytea representation for hyperloglog counter");
DATA(insert OID = 7165 ( _hyperloglog_estimator PGNSP PGUID -1 f b A f t \054 0 7157 0 array_in array_out array_recv array_send - - - i x f 0 -1 0 0 _null_ _null_ ));
/*
* macros
......
/*
* Copyright 2012, Tomas Vondra (tv@fuzzy.cz). All rights reserved.
* Copyright 2015, Conversant, Inc. All rights reserved.
* Copyright 2018, Pivotal Software, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are
* permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this list of
* conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice, this list
* of conditions and the following disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY TOMAS VONDRA, CONVERSANT INC, PIVOTAL SOFTWARE INC. AND ANY
* OTHER CONTRIBUTORS (THE "AUTHORS") ''AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are those of the
* Authors and should not be interpreted as representing official policies, either expressed
* or implied, of the Authors.
*/
#ifndef _HYPERLOGLOG_H_
#define _HYPERLOGLOG_H_
/* This is an implementation of HyperLogLog algorithm as described in the
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册