提交 041e0547 编写于 作者: S Shreedhar Hardikar 提交者: Shreedhar Hardikar

Fix used_segspace counter leak

GPDB currently has 2 APIs of creating work files
  1. Using workfile_mgr :

    1. This uses a workfile set to contain all the related workfiles.
    2. All workfiles are created under the same directory. As a result,
       when it is time to delete the workfiles, we can simply delete the directory.
    3. There is no mechanism to control the exact name of the workfile created.
       All workfiles are referenced using an ID. `"%s/spillfile\_f%u", work\_set-\>path, file\_no);`
    4. Deletion of workiles is automatically handled in case of an exception,
       as all the files are contained in one directory, thus no counter/files are leaked.

  2. Using ExecWorkfile :

    1. There is a Create/Open/Read/Write/Flush/Close API.
    2. This is the lower-level API also used by workfile_mgr to manage the workfiles directly.
    3. Each operator is resposible to correctly close the workfile it creates.

  When we write to a workfile, we reserve and add the number of bytes to used_segspace,
  and add that amount to `workfile->size`. Then, we call `workfile_update_in_progress_size`.
  If the workfile has an owning workset, we update the workset's `in_progress_size` counter.
  However, if it has no such workset, we completely ignore the count -
  and this is what ultimately leaks the `used_segspace` counter.

  This is a fix to account for bytes written in workfiles that are not part of any workfile set -
  that is those created using the lower level API. Hence when we abort or error out,
  we subtract this amount from the `used_segspace` counter.

Authors:
Shreedhar Hardikar <shardikar@pivotal.io>
Xin Zhang <xzhang@pivotal.io>
Karthikeyan Jambu Rajaraman <karthi.jrk@gmail.com>
Nikos Armenatzoglou <nikos.armenatzoglou@gmail.com>
上级 c7cd2999
......@@ -382,6 +382,7 @@ class GpInjectFaultProgram:
"malloc_failure (inject fault to simulate memory allocation failure), " \
"transaction_abort_failure (inject fault to simulate transaction abort failure), " \
"workfile_creation_failure (inject fault to simulate workfile creation failure), " \
"workfile_write_failure (inject fault to simulate workfile write failure), " \
"update_committed_eof_in_persistent_table (inject fault before committed EOF is updated in gp_persistent_relation_node for Append Only segment files), " \
"exec_simple_query_end_command (inject fault before EndCommand in exec_simple_query), " \
"multi_exec_hash_large_vmem (allocate large vmem using palloc inside MultiExecHash to attempt to exceed vmem limit), " \
......
......@@ -20,6 +20,8 @@
#include "utils/workfile_mgr.h"
#include "utils/memutils.h"
#include "utils/faultinjector.h"
/*
* Number of temporary files opened during the current session;
* this is used in generation of unique tempfile names.
......@@ -218,6 +220,8 @@ ExecWorkFile_Write(ExecWorkFile *workfile,
Assert(workfile != NULL);
uint64 bytes;
SIMPLE_FAULT_INJECTOR(WorkfileWriteFail);
if (data == NULL || size == 0)
{
return false;
......@@ -258,7 +262,7 @@ ExecWorkFile_Write(ExecWorkFile *workfile,
workfile->size = new_size;
WorkfileDiskspace_Commit( (new_size - current_size), size, true /* update_query_size */);
workfile_update_in_progress_size(workfile, new_size - current_size);
workfile_set_update_in_progress_size(workfile->work_set, new_size - current_size);
if (bytes != size)
{
......@@ -287,7 +291,7 @@ ExecWorkFile_Write(ExecWorkFile *workfile,
{
WorkfileDiskspace_Commit(size, size, true /* update_query_size */);
}
workfile_update_in_progress_size(workfile, size);
workfile_set_update_in_progress_size(workfile->work_set, size);
break;
default:
......@@ -553,7 +557,7 @@ ExecWorkFile_Seek(ExecWorkFile *workfile, uint64 offset, int whence)
if (additional_size > 0)
{
WorkfileDiskspace_Commit(additional_size, additional_size, true /* update_query_size */);
workfile_update_in_progress_size(workfile, additional_size);
workfile_set_update_in_progress_size(workfile->work_set, additional_size);
}
return result;
......@@ -717,7 +721,7 @@ ExecWorkFile_AdjustBFZSize(ExecWorkFile *workfile, int64 file_size)
*/
Assert(bfz_file->compression_index > 0 || WorkfileDiskspace_IsFull());
WorkfileDiskspace_Commit(file_size, workfile->size, true /* update_query_size */);
workfile_update_in_progress_size(workfile, file_size - workfile->size);
workfile_set_update_in_progress_size(workfile->work_set, file_size - workfile->size);
workfile->size = file_size;
}
......@@ -748,7 +752,7 @@ ExecWorkFile_AdjustBFZSize(ExecWorkFile *workfile, int64 file_size)
}
WorkfileDiskspace_Commit(extra_bytes, extra_bytes, true /* update_query_size */);
workfile_update_in_progress_size(workfile, extra_bytes);
workfile_set_update_in_progress_size(workfile->work_set, extra_bytes);
workfile->size = file_size;
}
}
......
......@@ -253,6 +253,8 @@ FaultInjectorIdentifierEnumToString[] = {
/* inject fault to simulate transaction abort failure */
_("workfile_creation_failure"),
/* inject fault to simulate workfile creation failure */
_("workfile_write_failure"),
/* inject fault to simulate workfile write failure */
_("update_committed_eof_in_persistent_table"),
/* inject fault before committed EOF is updated in gp_persistent_relation_node for Append Only segment files */
_("exec_simple_query_end_command"),
......@@ -1163,6 +1165,7 @@ FaultInjector_NewHashEntry(
case Checkpoint:
case AbortTransactionFail:
case WorkfileCreationFail:
case WorkfileWriteFail:
case UpdateCommittedEofInPersistentTable:
case ExecSortBeforeSorting:
case FaultDuringExecDynamicTableScan:
......
......@@ -146,10 +146,11 @@ adjust_size_temp_file_new(workfile_set *work_set, int64 size)
AssertImply((NULL != work_set), work_set->size == 0);
AssertImply((NULL != work_set), work_set->in_progress_size >= size);
if (NULL != work_set)
{
work_set->in_progress_size -= size;
}
/*
* Decrement the size with work_set->in_progress_size or
* used_segspace_not_in_workfile_set
*/
workfile_set_update_in_progress_size(work_set, -size);
WorkfileDiskspace_Commit(0 /* commit_bytes */, size, true /* update_query_size */);
elog(gp_workfile_caching_loglevel, "closed and deleted temp file, subtracted size " INT64_FORMAT " from disk space", size);
......
......@@ -37,6 +37,9 @@ typedef struct workset_info
char *dir_path;
} workset_info;
/* Counter to keep track of workfile segspace used without a workfile set. */
static int64 used_segspace_not_in_workfile_set;
/* Forward declarations */
static void workfile_mgr_populate_set(const void *resource, const void *param);
static void workfile_mgr_cleanup_set(const void *resource);
......@@ -86,6 +89,8 @@ workfile_mgr_cache_init(void)
* to track disk space usage
*/
WorkfileDiskspace_Init();
used_segspace_not_in_workfile_set = 0;
}
/*
......@@ -455,18 +460,25 @@ workfile_mgr_cleanup(void)
{
Assert(NULL != workfile_mgr_cache);
Cache_SurrenderClientEntries(workfile_mgr_cache);
WorkfileDiskspace_Commit(0, used_segspace_not_in_workfile_set, false /* update_query_space */);
used_segspace_not_in_workfile_set = 0;
}
/*
* Updates the in-progress size of a workset while it is being created.
*/
void
workfile_update_in_progress_size(ExecWorkFile *workfile, int64 size)
workfile_set_update_in_progress_size(workfile_set *work_set, int64 size)
{
if (NULL != workfile->work_set)
if (NULL != work_set)
{
work_set->in_progress_size += size;
Assert(work_set->in_progress_size >= 0);
}
else
{
workfile->work_set->in_progress_size += size;
Assert(workfile->work_set->in_progress_size >= 0);
used_segspace_not_in_workfile_set += size;
Assert(used_segspace_not_in_workfile_set >= 0);
}
}
......
......@@ -163,6 +163,7 @@ typedef enum FaultInjectorIdentifier_e {
MallocFailure,
AbortTransactionFail,
WorkfileCreationFail,
WorkfileWriteFail,
UpdateCommittedEofInPersistentTable,
......
......@@ -134,7 +134,7 @@ void workfile_mgr_cache_init(void);
void workfile_mgr_mark_complete(workfile_set *work_set);
Cache *workfile_mgr_get_cache(void);
int32 workfile_mgr_clear_cache(int seg_id);
void workfile_update_in_progress_size(ExecWorkFile *workfile, int64 size);
void workfile_set_update_in_progress_size(workfile_set *work_set, int64 size);
/* Workfile File operations */
ExecWorkFile *workfile_mgr_create_file(workfile_set *work_set);
......
--
-- Clean up for segspace.sql tests
--
-- start_ignore
\! gpconfig -r gp_workfile_limit_per_segment
\! gpstop -rai
-- end_ignore
--
-- Setup for segspace.sql tests
--
-- We are setting gp_workfile_limit_per_segment to be 5.2GB so that all tests in
-- segspace.sql can run without hitting workfile limit. The tests examine if
-- 'used_segspace' counter is changed and get reset to 0 at the end of
-- query execution. If gp_workfile_limit_per_segment == 0 (default value),
-- then we don't change 'used_segspace' counter.
-- start_ignore
\! gpconfig -c gp_workfile_limit_per_segment -v 5242880
\! gpstop -rai
-- end_ignore
......@@ -29,9 +29,11 @@ test: indexjoin as_alias regex_gp gpparams with_clause transient_types gang_mgmt
# dispatch should always run seperately from other cases.
test: dispatch
# 'segspace' relies on the segment spill space to be 0, and uses fault injectors
# so it needs to be in a group by itself
# This test will change the gp_workfile_limit_per_segment and will need to restart the DB.
# It will also use faultinjector - so it needs to be in a group by itself.
test: segspace_setup
test: segspace
test: segspace_cleanup
# 'query_finish_pending' sets QueryFinishPending flag to true during query execution using fault injectors
# so it needs to be in a group by itself
......
--
-- Tests the spill files disk space accounting mechanism
--
-- create view to read the segspace value
DROP VIEW IF EXISTS segspace_view_gp_workfile_segspace;
DROP FUNCTION IF EXISTS segspace_view_gp_workfile_segspace_f();
DROP VIEW IF EXISTS segspace_view_gp_workfile_mgr_reset_segspace;
DROP FUNCTION IF EXISTS segspace_view_gp_workfile_mgr_reset_segspace_f();
CREATE FUNCTION segspace_view_gp_workfile_segspace_f()
RETURNS SETOF record
AS '$libdir/gp_workfile_mgr', 'gp_workfile_mgr_used_diskspace'
LANGUAGE C IMMUTABLE;
CREATE VIEW segspace_view_gp_workfile_segspace AS
SELECT C.*
FROM gp_toolkit.__gp_localid, segspace_view_gp_workfile_segspace_f() AS C (
segid int,
size bigint
)
UNION ALL
SELECT C.*
FROM gp_toolkit.__gp_masterid, segspace_view_gp_workfile_segspace_f() AS C (
segid int,
size bigint
);
-- create helper UDF to reset the segpsace variable
CREATE FUNCTION segspace_view_gp_workfile_mgr_reset_segspace_f()
RETURNS SETOF bigint
AS '$libdir/gp_workfile_mgr', 'gp_workfile_mgr_reset_segspace'
LANGUAGE C IMMUTABLE;
CREATE VIEW segspace_view_gp_workfile_mgr_reset_segspace AS
SELECT * FROM gp_toolkit.__gp_localid, segspace_view_gp_workfile_mgr_reset_segspace_f()
UNION ALL
SELECT * FROM gp_toolkit.__gp_masterid, segspace_view_gp_workfile_mgr_reset_segspace_f();
--
-- check segspace before test
reset statement_mem;
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
--- create and populate the table
......@@ -61,12 +24,6 @@ INSERT INTO segspace_test_hj_skew SELECT i,i,i,i,i,i,i,i FROM
ANALYZE segspace_test_hj_skew;
-- reset the segspace value
-- start_ignore
select count(*) > 0 from segspace_view_gp_workfile_mgr_reset_segspace;
-- end_ignore
--
-- Testing that query cancelation during spilling updates the accounting
--
......@@ -88,19 +45,23 @@ begin;
SELECT t1.* FROM segspace_test_hj_skew AS t1, segspace_test_hj_skew AS t2 WHERE t1.i1=t2.i2;
rollback;
-- check used segspace after test
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
-- Run the test without fault injection
begin;
-- Doing select count so output file doesn't have 75000 rows.
select count(*) from
(SELECT t1.* FROM segspace_test_hj_skew AS t1, segspace_test_hj_skew AS t2 WHERE t1.i1=t2.i2) temp;
rollback;
-- check used segspace after test
reset statement_mem;
select max(size) from segspace_view_gp_workfile_segspace;
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
------------ Interrupting INSERT INTO query that spills -------------------
-- enable the fault injector
--start_ignore
\! gpfaultinjector -f exec_hashjoin_new_batch -y reset --seg_dbid 2
\! gpfaultinjector -f exec_hashjoin_new_batch -y interrupt --seg_dbid 2
--end_ignore
drop table if exists segspace_t1_created;
create table segspace_t1_created (i1 int, i2 int, i3 int, i4 int, i5 int, i6 int, i7 int, i8 int) DISTRIBUTED BY (i1);
......@@ -109,6 +70,23 @@ set statement_mem=2048;
set gp_autostats_mode = none;
set gp_hashjoin_metadata_memory_percent=0;
-- enable the fault injector
--start_ignore
\! gpfaultinjector -f exec_hashjoin_new_batch -y reset --seg_dbid 2
\! gpfaultinjector -f exec_hashjoin_new_batch -y interrupt --seg_dbid 2
--end_ignore
begin;
insert into segspace_t1_created
SELECT t1.* FROM segspace_test_hj_skew AS t1, segspace_test_hj_skew AS t2 WHERE t1.i1=t2.i2;
rollback;
-- check used segspace after test
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
-- Run the test without fault injection
begin;
insert into segspace_t1_created
......@@ -118,7 +96,7 @@ rollback;
-- check used segspace after test
reset statement_mem;
select max(size) from segspace_view_gp_workfile_segspace;
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
--start_ignore
drop table if exists segspace_t1_created;
......@@ -126,18 +104,29 @@ drop table if exists segspace_t1_created;
------------ Interrupting CREATE TABLE AS query that spills -------------------
drop table if exists segspace_t1_created;
set gp_workfile_type_hashjoin=buffile;
set statement_mem=2048;
set gp_autostats_mode = none;
set gp_hashjoin_metadata_memory_percent=0;
-- enable the fault injector
--start_ignore
\! gpfaultinjector -f exec_hashjoin_new_batch -y reset --seg_dbid 2
\! gpfaultinjector -f exec_hashjoin_new_batch -y interrupt --seg_dbid 2
--end_ignore
drop table if exists segspace_t1_created;
set gp_workfile_type_hashjoin=buffile;
set statement_mem=2048;
set gp_autostats_mode = none;
set gp_hashjoin_metadata_memory_percent=0;
begin;
create table segspace_t1_created AS
SELECT t1.* FROM segspace_test_hj_skew AS t1, segspace_test_hj_skew AS t2 WHERE t1.i1=t2.i2;
rollback;
-- check used segspace after test
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
-- Run the test without fault injection
begin;
create table segspace_t1_created AS
......@@ -147,9 +136,165 @@ rollback;
-- check used segspace after test
reset statement_mem;
select max(size) from segspace_view_gp_workfile_segspace;
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
------------ workfile_limit_per_segment leak check during ERROR on UPDATE with CTE and MK_SORT -------------------
drop table if exists testsisc;
drop table if exists foo;
create table testsisc (i1 int, i2 int, i3 int, i4 int);
insert into testsisc select i, i % 1000, i % 100000, i % 75 from
(select generate_series(1, nsegments * 100000) as i from
(select count(*) as nsegments from gp_segment_configuration where role='p' and content >= 0) foo) bar;
-- Disable faultinjectors
create table foo (i int, j int);
set statement_mem=1024; -- 1mb for 3 segment to get leak.
set gp_resqueue_print_operator_memory_limits=on;
set gp_enable_mk_sort=on;
set gp_cte_sharing=on;
-- enable the fault injector
--start_ignore
\! gpfaultinjector -f exec_hashjoin_new_batch -y reset --seg_dbid 2
\! gpfaultinjector -f workfile_write_failure -y reset --seg_dbid 2
\! gpfaultinjector -f workfile_write_failure -y error --seg_dbid 2
--end_ignore
-- LEAK in UPDATE: update with sisc xslice sort
update foo set j=m.cc1 from (
with ctesisc as
(select * from testsisc order by i2)
select t1.i1 as cc1, t1.i2 as cc2
from ctesisc as t1, ctesisc as t2
where t1.i1 = t2.i2 ) as m;
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
-- Run the test without fault injection
-- LEAK in UPDATE: update with sisc xslice sort
update foo set j=m.cc1 from (
with ctesisc as
(select * from testsisc order by i2)
select t1.i1 as cc1, t1.i2 as cc2
from ctesisc as t1, ctesisc as t2
where t1.i1 = t2.i2 ) as m;
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
reset statement_mem;
reset gp_resqueue_print_operator_memory_limits;
reset gp_enable_mk_sort;
reset gp_cte_sharing;
------------ workfile_limit_per_segment leak check during ERROR on DELETE with APPEND-ONLY table -------------------
drop table if exists testsisc;
drop table if exists foo;
create table testsisc (i1 int, i2 int, i3 int, i4 int) WITH (appendonly=true, compresstype=zlib) ;
insert into testsisc select i, i % 1000, i % 100000, i % 75 from
(select generate_series(1, nsegments * 100000) as i from
(select count(*) as nsegments from gp_segment_configuration where role='p' and content >= 0) foo) bar;
create table foo (i int, j int) WITH (appendonly=true, compresstype=zlib);
insert into foo select i, i % 1000 from
(select generate_series(1, nsegments * 100000) as i from
(select count(*) as nsegments from gp_segment_configuration where role='p' and content >= 0) foo) bar;
set statement_mem=1024; -- 1mb for 3 segment to get leak.
-- enable the fault injector
--start_ignore
\! gpfaultinjector -f workfile_write_failure -y reset --seg_dbid 2
\! gpfaultinjector -f workfile_write_failure -y error --seg_dbid 2
--end_ignore
-- LEAK in DELETE with APPEND ONLY tables
delete from testsisc using (
select *
from foo
) src where testsisc.i1 = src.i;
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
-- Run the test without fault injection
-- LEAK in DELETE with APPEND ONLY tables
delete from testsisc using (
select *
from foo
) src where testsisc.i1 = src.i;
reset statement_mem;
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
------------ workfile_limit_per_segment leak check during UPDATE of SORT -------------------
drop table if exists testsort;
create table testsort (i1 int, i2 int, i3 int, i4 int);
insert into testsort select i, i % 1000, i % 100000, i % 75 from generate_series(0,1000000) i;
set statement_mem="1MB";
set gp_enable_mk_sort=off;
drop table if exists foo;
create table foo (c int, d int);
-- enable the fault injector
--start_ignore
\! gpfaultinjector -f workfile_write_failure -y reset --seg_dbid 2
\! gpfaultinjector -f workfile_write_failure -y error --seg_dbid 2
--end_ignore
-- expect to see leak if we hit error
update foo set d = i1 from (select i1,i2 from testsort order by i2) x;
-- check counter leak
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
-- Run the test without fault injection
-- expect to see leak if we hit error
update foo set d = i1 from (select i1,i2 from testsort order by i2) x;
-- check counter leak
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
------------ workfile_limit_per_segment leak check during UPDATE of SHARE_SORT_XSLICE -------------------
drop table if exists testsisc;
create table testsisc (i1 int, i2 int, i3 int, i4 int);
insert into testsisc select i, i % 1000, i % 100000, i % 75 from generate_series(0,1000000) i;
set statement_mem="1MB";
set gp_enable_mk_sort=off;
set gp_cte_sharing=on;
drop table if exists foo;
create table foo (c int, d int);
-- expect to see leak if we hit error
-- enable the fault injector
--start_ignore
\! gpfaultinjector -f workfile_write_failure -y reset --seg_dbid 2
\! gpfaultinjector -f workfile_write_failure -y error --seg_dbid 2
--end_ignore
update foo set d = i1 from (with ctesisc as (select * from testsisc order by i2)
select * from
(select count(*) from ctesisc) x(a), ctesisc
where x.a = ctesisc.i1) y;
-- check counter leak
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
-- Run the test without fault injection
update foo set d = i1 from (with ctesisc as (select * from testsisc order by i2)
select * from
(select count(*) from ctesisc) x(a), ctesisc
where x.a = ctesisc.i1) y;
-- check counter leak
select max(bytes) as max, min(bytes) as min from gp_toolkit.gp_workfile_mgr_used_diskspace;
--
-- Clean up for segspace.sql tests
--
-- start_ignore
\! gpconfig -r gp_workfile_limit_per_segment
\! PGDATESTYLE="" gpstop -rai
-- end_ignore
--
-- Setup for segspace.sql tests
--
-- We are setting gp_workfile_limit_per_segment to be 5.2GB so that all tests in
-- segspace.sql can run without hitting workfile limit. The tests examine if
-- 'used_segspace' counter is changed and get reset to 0 at the end of
-- query execution. If gp_workfile_limit_per_segment == 0 (default value),
-- then we don't change 'used_segspace' counter.
-- start_ignore
\! gpconfig -c gp_workfile_limit_per_segment -v 5242880
\! PGDATESTYLE="" gpstop -rai
-- end_ignore
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册