提交 50e67f10 编写于 作者: G George Caragea

Removing the Workfile Caching feature from executor

上级 7065077b
......@@ -340,7 +340,6 @@ int gp_hashagg_compress_spill_files = 0;
int gp_workfile_compress_algorithm = 0;
bool gp_workfile_checksumming = false;
bool gp_workfile_caching = false;
int gp_workfile_caching_loglevel = DEBUG1;
int gp_sessionstate_loglevel = DEBUG1;
/* Maximum disk space to use for workfiles on a segment, in kilobytes */
......
......@@ -54,11 +54,6 @@ struct BatchFileInfo
#define BATCHFILE_METADATA \
(sizeof(BatchFileInfo) + sizeof(bfz_t) + sizeof(struct bfz_freeable_stuff))
#define FREEABLE_BATCHFILE_METADATA (sizeof(struct bfz_freeable_stuff))
/*
* Number of batchfile metadata to reserve during spilling in order to have
* enough memory to open them at reuse.
*/
#define NO_RESERVED_BATCHFILE_METADATA 256
/* Used for padding */
static char padding_dummy[MAXIMUM_ALIGNOF];
......@@ -96,7 +91,6 @@ typedef enum InputRecordType
/* Methods that handle batch files */
static SpillSet *createSpillSet(unsigned branching_factor, unsigned parent_hash_bit);
static SpillSet *read_spill_set(AggState *aggstate);
static int closeSpillFile(AggState *aggstate, SpillSet *spill_set, int file_no);
static int closeSpillFiles(AggState *aggstate, SpillSet *spill_set);
static int suspendSpillFiles(SpillSet *spill_set);
......@@ -120,13 +114,6 @@ static void reset_agg_hash_table(AggState *aggstate);
static bool agg_hash_reload(AggState *aggstate);
static inline void *mpool_cxt_alloc(void *manager, Size len);
/* Methods for state file */
static void create_state_file(HashAggTable *hashtable);
static void agg_hash_save_spillfile_info(ExecWorkFile *state_file, SpillFile *spill_file);
static bool agg_hash_load_spillfile_info(ExecWorkFile *state_file, char **spill_file_name, unsigned *batch_hash_bit);
static void agg_hash_write_string(ExecWorkFile *ewf, const char *str, size_t len);
static char *agg_hash_read_string(ExecWorkFile *ewf);
static inline void *mpool_cxt_alloc(void *manager, Size len)
{
return mpool_alloc((MPool *)manager, len);
......@@ -694,28 +681,7 @@ create_agg_hash_table(AggState *aggstate)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
bool can_reuse_workfiles = false;
workfile_set *work_set = NULL;
if (gp_workfile_caching)
{
/* Look up SFR for existing spill set. Mark here if found */
work_set = workfile_mgr_find_set(&aggstate->ss.ps);
/*
* Workaround for case when reusing an existing spill set would
* use too much metadata memory and might cause respilling:
* don't allow reusing for these sets for now.
*/
can_reuse_workfiles = (work_set != NULL) &&
work_set->metadata.num_leaf_files <= NO_RESERVED_BATCHFILE_METADATA;
}
uint64 operatorMemKB = PlanStateOperatorMemKB( (PlanState *) aggstate);
if (gp_workfile_caching && ! can_reuse_workfiles)
{
uint64 reservedMem = NO_RESERVED_BATCHFILE_METADATA * (BATCHFILE_METADATA - FREEABLE_BATCHFILE_METADATA);
operatorMemKB = operatorMemKB - reservedMem / 1024;
elog(gp_workfile_caching_loglevel, "HashAgg: reserved " INT64_FORMAT "KB for spilling", reservedMem / 1024);
}
if (!calcHashAggTableSizes(1024.0 * (double) operatorMemKB,
(double)agg->numGroups,
......@@ -730,16 +696,6 @@ create_agg_hash_table(AggState *aggstate)
elog(ERROR, ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY);
}
if (can_reuse_workfiles)
{
aggstate->cached_workfiles_found = true;
hashtable->work_set = work_set;
/* Initialize hashtable parameters from the cached workfile */
hashtable->hats.nbatches = work_set->metadata.num_leaf_files;
hashtable->hats.nbuckets = work_set->metadata.buckets;
hashtable->num_batches = work_set->metadata.num_leaf_files;
}
/* Initialize the hash buckets */
hashtable->nbuckets = hashtable->hats.nbuckets;
hashtable->total_buckets = hashtable->nbuckets;
......@@ -808,38 +764,6 @@ agg_hash_initial_pass(AggState *aggstate)
elog(HHA_MSG_LVL,
"HashAgg: initial pass -- beginning to load hash table");
/* If we found cached workfiles, initialize and load the batch data here */
if (gp_workfile_caching && aggstate->cached_workfiles_found)
{
elog(HHA_MSG_LVL, "Found existing SFS, reloading data from %s", hashtable->work_set->path);
/* Initialize all structures as if we just spilled everything */
hashtable->spill_set = read_spill_set(aggstate);
aggstate->hhashtable->is_spilling = true;
aggstate->cached_workfiles_loaded = true;
elog(gp_workfile_caching_loglevel, "HashAgg reusing cached workfiles, initiating Squelch walker");
PlanState *outerNode = outerPlanState(aggstate);
ExecSquelchNode(outerNode);
/* tuple table initialization */
ScanState *scanstate = & aggstate->ss;
PlanState *outerPlan = outerPlanState(scanstate);
TupleDesc tupDesc = ExecGetResultType(outerPlan);
if (aggstate->ss.ps.instrument)
{
aggstate->ss.ps.instrument->workfileReused = true;
}
/* Initialize hashslot by cloning input slot. */
ExecSetSlotDescriptor(aggstate->hashslot, tupDesc);
ExecStoreAllNullTuple(aggstate->hashslot);
mt_bind = aggstate->hashslot->tts_mt_bind;
return tuple_remaining;
}
/*
* Check if an input tuple has been read, but not processed
* because of lack of space before streaming the results
......@@ -1143,17 +1067,6 @@ closeSpillFile(AggState *aggstate, SpillSet *spill_set, int file_no)
spill_file = &spill_set->spill_files[file_no];
if (spill_file->file_info &&
gp_workfile_caching &&
aggstate->workfiles_created &&
!spill_file->respilled)
{
Assert(hashtable->state_file);
/* closing "leaf" spill file; save it's name to the state file for re-using */
agg_hash_save_spillfile_info(hashtable->state_file, spill_file);
hashtable->work_set->metadata.num_leaf_files++;
}
if (spill_file->spill_set != NULL)
{
freedspace += closeSpillFiles(aggstate, spill_file->spill_set);
......@@ -1258,112 +1171,6 @@ obtain_spill_set(HashAggTable *hashtable)
return *p_spill_set;
}
/*
* read_spill_set
* Read a previously written spill file set.
*
* The statistics in the hashtable is also updated.
*/
static SpillSet *
read_spill_set(AggState *aggstate)
{
Assert(aggstate != NULL);
Assert(aggstate->hhashtable != NULL);
Assert(aggstate->hhashtable->spill_set == NULL);
Assert(aggstate->hhashtable->curr_spill_file == NULL);
Assert(aggstate->hhashtable->work_set);
Assert(aggstate->hhashtable->work_set->metadata.num_leaf_files == aggstate->hhashtable->num_batches);
Assert(aggstate->hhashtable->work_set->metadata.buckets == aggstate->hhashtable->nbuckets);
workfile_set *work_set = aggstate->hhashtable->work_set;
uint32 alloc_size = 0;
HashAggTable *hashtable = aggstate->hhashtable;
/*
* Create spill set. Initialize each batch hash bit with 0. We'll set them to the right
* value individually below.
*/
int default_hash_bit = 0;
SpillSet *spill_set = createSpillSet(work_set->metadata.num_leaf_files, default_hash_bit);
/*
* Read metadata file to determine number and name of work files in the set
* Format of state file:
* - [name_of_leaf_workfile | batch_hash_bit] x N
*/
hashtable->state_file = workfile_mgr_open_fileno(work_set, WORKFILE_NUM_HASHAGG_METADATA);
Assert(hashtable->state_file != NULL);
/*
* Read, allocate and open all spill files.
* The spill files are opened in reverse order when saving tuples,
* so re-open them in the same order.
*/
uint32 no_filenames_read = 0;
while(true)
{
char *batch_filename = NULL;
unsigned read_batch_hashbit;
bool more_spillfiles = agg_hash_load_spillfile_info(hashtable->state_file, &batch_filename, &read_batch_hashbit);
if (!more_spillfiles)
{
break;
}
uint32 current_spill_file_no = work_set->metadata.num_leaf_files - no_filenames_read - 1;
Assert(current_spill_file_no >= 0);
SpillFile *spill_file = &spill_set->spill_files[current_spill_file_no];
Assert(spill_file->index_in_parent == current_spill_file_no);
spill_file->batch_hash_bit = read_batch_hashbit;
spill_file->file_info = (BatchFileInfo *)palloc(sizeof(BatchFileInfo));
spill_file->file_info->wfile =
ExecWorkFile_Open(batch_filename, BFZ, false /* delOnClose */,
work_set->metadata.bfz_compress_type);
Assert(spill_file->file_info->wfile != NULL);
Assert(batch_filename != NULL);
pfree(batch_filename);
spill_file->file_info->total_bytes = ExecWorkFile_GetSize(spill_file->file_info->wfile);
/* Made up values for this since we don't know it at this point */
spill_file->file_info->ntuples = 1;
elog(HHA_MSG_LVL, "HashAgg: OPEN %d level batch file %d with compression %d",
spill_set->level, no_filenames_read, work_set->metadata.bfz_compress_type);
/*
* bfz_open automatically frees up the freeable_stuff structure.
* Subtract that from the allocated size here.
*/
alloc_size += BATCHFILE_METADATA - FREEABLE_BATCHFILE_METADATA;
no_filenames_read++;
}
Assert(work_set->metadata.num_leaf_files == no_filenames_read);
/* Update statistics */
hashtable->num_overflows++;
hashtable->mem_for_metadata +=
sizeof(SpillSet) +
(hashtable->hats.nbatches - 1) * sizeof(SpillFile);
SANITY_CHECK_METADATA_SIZE(hashtable);
hashtable->mem_for_metadata += alloc_size;
if (alloc_size > 0)
{
Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_AGG_SPILLBATCH);
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
}
return spill_set;
}
/* Spill all entries from the hash table to file in order to make room
* for new hash entries.
*
......@@ -1393,11 +1200,7 @@ spill_hash_table(AggState *aggstate)
{
hashtable->work_set = workfile_mgr_create_set(BFZ, true /* can_be_reused */, &aggstate->ss.ps, NULL_SNAPSHOT);
hashtable->work_set->metadata.buckets = hashtable->nbuckets;
if (gp_workfile_caching)
{
create_state_file(hashtable);
}
aggstate->workfiles_created = true;
//aggstate->workfiles_created = true;
}
/* Book keeping. */
......@@ -1475,30 +1278,6 @@ spill_hash_table(AggState *aggstate)
MemoryContextSwitchTo(oldcxt);
}
/*
* Create and open state file holding metadata. Used for workfile re-using.
*/
static void
create_state_file(HashAggTable *hashtable)
{
Assert(hashtable != NULL);
hashtable->state_file = workfile_mgr_create_fileno(hashtable->work_set, WORKFILE_NUM_HASHAGG_METADATA);
Assert(hashtable->state_file != NULL);
}
/*
* Close state file holding spill set metadata. Used for workfile re-using.
*/
void
agg_hash_close_state_file(HashAggTable *hashtable)
{
if (hashtable->state_file != NULL)
{
workfile_mgr_close_file(hashtable->work_set, hashtable->state_file);
hashtable->state_file = NULL;
}
}
/*
* writeHashEntry -- write an hash entry to a batch file.
*
......@@ -1786,11 +1565,8 @@ agg_hash_reload(AggState *aggstate)
{
spill_file->file_info->ntuples--;
Assert(spill_file->parent_spill_set != NULL);
/* The following asserts the mapping between a hashkey bucket and the index in parent.
* This assertion does not hold for reloaded workfiles, since there the index in
* parent is different. */
AssertImply(!aggstate->cached_workfiles_loaded,
(hashkey >> spill_file->batch_hash_bit) %
/* The following asserts the mapping between a hashkey bucket and the index in parent. */
Assert((hashkey >> spill_file->batch_hash_bit) %
spill_file->parent_spill_set->num_spill_files ==
spill_file->index_in_parent);
......@@ -1801,7 +1577,6 @@ agg_hash_reload(AggState *aggstate)
else
{
/* Check we processed all tuples, only when not reading from disk */
AssertImply(!aggstate->cached_workfiles_loaded, spill_file->file_info->ntuples == 0);
break;
}
......@@ -1815,7 +1590,6 @@ agg_hash_reload(AggState *aggstate)
if (entry == NULL)
{
Assert(!aggstate->cached_workfiles_loaded && "no re-spilling allowed when re-using cached workfiles");
Assert(hashtable->curr_spill_file != NULL);
Assert(hashtable->curr_spill_file->parent_spill_set != NULL);
......@@ -2127,14 +1901,11 @@ agg_hash_next_pass(AggState *aggstate)
hashtable->num_output_groups,
hashtable->num_batches);
if (!aggstate->cached_workfiles_loaded)
{
appendStringInfo(hbuf,
appendStringInfo(hbuf,
"; %d overflows"
"; " INT64_FORMAT " spill groups",
hashtable->num_overflows,
hashtable->num_spill_groups);
}
appendStringInfo(hbuf, ".\n");
......@@ -2213,12 +1984,9 @@ void destroy_agg_hash_table(AggState *aggstate)
if (NULL != aggstate->hhashtable->work_set)
{
agg_hash_close_state_file(aggstate->hhashtable);
workfile_mgr_close_set(aggstate->hhashtable->work_set);
}
agg_hash_reset_workfile_state(aggstate);
mpool_delete(aggstate->hhashtable->group_buf);
pfree(aggstate->hhashtable);
......@@ -2227,16 +1995,8 @@ void destroy_agg_hash_table(AggState *aggstate)
}
/*
* Reset workfile caching state
* Marks workfile set as complete
*/
void
agg_hash_reset_workfile_state(AggState *aggstate)
{
aggstate->workfiles_created = false;
aggstate->cached_workfiles_found = false;
aggstate->cached_workfiles_loaded = false;
}
void
agg_hash_mark_spillset_complete(AggState *aggstate)
{
......@@ -2246,118 +2006,7 @@ agg_hash_mark_spillset_complete(AggState *aggstate)
Assert(aggstate->hhashtable->work_set != NULL);
workfile_set *work_set = aggstate->hhashtable->work_set;
bool workset_metadata_too_big = work_set->metadata.num_leaf_files > NO_RESERVED_BATCHFILE_METADATA;
if (workset_metadata_too_big)
{
work_set->can_be_reused = false;
elog(gp_workfile_caching_loglevel, "HashAgg: spill set contains too many files: %d. Not caching",
work_set->metadata.num_leaf_files);
}
workfile_mgr_mark_complete(work_set);
}
/*
* Save a spill file information to the state file.
* Format is: [name_length|name|hash_bit].
* The same format must be expected in agg_hash_load_spillfile_info
*/
static void
agg_hash_save_spillfile_info(ExecWorkFile *state_file, SpillFile *spill_file)
{
if (WorkfileDiskspace_IsFull())
{
/*
* We exceeded the amount of diskspace for spilling. Don't try to
* write anything anymore, as we're in the cleanup stage.
*/
return;
}
Assert(NULL != spill_file->file_info);
Assert(NULL != spill_file->file_info->wfile);
agg_hash_write_string(state_file,
spill_file->file_info->wfile->fileName,
strlen(spill_file->file_info->wfile->fileName));
ExecWorkFile_Write(state_file,
(char *) &spill_file->batch_hash_bit,
sizeof(spill_file->batch_hash_bit));
}
/*
* Load a spill file information to the state file.
* Format is: [name_length|name|hash_bit].
* The same format must be written in agg_hash_save_spillfile_info
* Sets spill_file_name to point to the read file name, which is palloc-ed in
* the current context.
* Return FALSE at EOF, TRUE otherwise.
*/
static bool
agg_hash_load_spillfile_info(ExecWorkFile *state_file, char **spill_file_name, unsigned *batch_hash_bit)
{
*spill_file_name = agg_hash_read_string(state_file);
if (*spill_file_name == NULL)
{
/* EOF. No more file names to read. */
return false;
}
unsigned read_hash_bit;
#ifdef USE_ASSERT_CHECKING
int res =
#endif
ExecWorkFile_Read(state_file, (char *) &read_hash_bit, sizeof(read_hash_bit));
Assert(res == sizeof(read_hash_bit));
*batch_hash_bit = read_hash_bit;
return true;
}
/* Writing string to a bfz file
* Format: [length|data]
* This must be the same format used in agg_hash_read_string_bfz
*/
static void
agg_hash_write_string(ExecWorkFile *ewf, const char *str, size_t len)
{
Assert(ewf != NULL);
ExecWorkFile_Write(ewf, (char *) &len, sizeof(len));
/* Terminating null character is not written to disk */
ExecWorkFile_Write(ewf, (char *) str, len);
}
/* Reading a string from a bfz file
* Format: [length|string]
* This must be the same format used in agg_hash_write_string_bfz
* Returns the palloc-ed string in the current context, NULL if error occurs.
*/
static char *
agg_hash_read_string(ExecWorkFile *ewf)
{
Assert(ewf != NULL);
size_t slen = 0;
int res = ExecWorkFile_Read(ewf, (char *) &slen, sizeof(slen));
if (res != sizeof(slen))
{
return NULL;
}
char *read_string = palloc(slen+1);
res = ExecWorkFile_Read(ewf, read_string, slen);
if (res < slen)
{
pfree(read_string);
return NULL;
}
read_string[slen]='\0';
return read_string;
}
/* EOF */
......@@ -1080,18 +1080,6 @@ ExecAgg(AggState *node)
case HASHAGG_END_OF_PASSES:
node->agg_done = true;
if (gp_workfile_caching && node->workfiles_created)
{
/*
* HashAgg closes each spill file after it is done with
* them. Since we got here on the regular path, all
* files should be closed.
*/
Assert(node->hhashtable->work_set);
Assert(node->hhashtable->spill_set == NULL);
agg_hash_close_state_file(node->hhashtable);
agg_hash_mark_spillset_complete(node);
}
ExecEagerFreeAgg(node);
return NULL;
......@@ -1738,7 +1726,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->pergroup = NULL;
aggstate->grp_firstTuple = NULL;
aggstate->hashtable = NULL;
agg_hash_reset_workfile_state(aggstate);
/*
* Create expression contexts. We need two, one for per-input-tuple
......
......@@ -273,7 +273,7 @@ ExecEndHash(HashState *node)
* ----------------------------------------------------------------
*/
HashJoinTable
ExecHashTableCreate(HashState *hashState, HashJoinState *hjstate, List *hashOperators, uint64 operatorMemKB, workfile_set * workfile_set)
ExecHashTableCreate(HashState *hashState, HashJoinState *hjstate, List *hashOperators, uint64 operatorMemKB)
{
HashJoinTable hashtable;
Plan *outerNode;
......@@ -339,20 +339,8 @@ ExecHashTableCreate(HashState *hashState, HashJoinState *hjstate, List *hashOper
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
if (workfile_set != NULL)
{
hashtable->work_set = workfile_set;
ExecHashJoinLoadBucketsBatches(hashtable);
Assert(hjstate->nbatch_loaded_state == -1);
Assert(hjstate->cached_workfiles_batches_buckets_loaded);
hjstate->nbatch_loaded_state = hashtable->nbatch;
}
else
{
ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
&hashtable->nbuckets, &hashtable->nbatch, operatorMemKB);
}
ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
&hashtable->nbuckets, &hashtable->nbatch, operatorMemKB);
nbuckets = hashtable->nbuckets;
nbatch = hashtable->nbatch;
......@@ -799,7 +787,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
/* dump it out */
Assert(batchno > curbatch);
Assert(batchno >= hashtable->hjstate->nbatch_loaded_state);
ExecHashJoinSaveTuple(NULL, HJTUPLE_MINTUPLE(tuple),
tuple->hashvalue,
hashtable,
......@@ -992,15 +979,9 @@ ExecHashTableInsert(HashState *hashState, HashJoinTable hashtable,
}
else
{
/*
* put the tuple into a temp file for later batches, only when the cached
* workfile is not used.
*/
if (!hashtable->hjstate->cached_workfiles_found)
{
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(ps, tuple, hashvalue, hashtable, &batch->innerside, hashtable->bfCxt);
}
/* put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(ps, tuple, hashvalue, hashtable, &batch->innerside, hashtable->bfCxt);
}
}
END_MEMORY_ACCOUNT();
......@@ -1391,29 +1372,22 @@ ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf)
/* Report workfile I/O statistics. */
if (hashtable->nbatch > 1)
{
if (!hjstate->cached_workfiles_loaded)
{
ExecHashTableExplainBatches(hashtable, buf, 0, 1, "Initial");
ExecHashTableExplainBatches(hashtable,
buf,
1,
hashtable->nbatch_original,
"Initial");
ExecHashTableExplainBatches(hashtable,
buf,
hashtable->nbatch_original,
hashtable->nbatch_outstart,
"Overflow");
ExecHashTableExplainBatches(hashtable,
buf,
hashtable->nbatch_outstart,
hashtable->nbatch,
"Secondary Overflow");
}
else
{
ExecHashTableExplainBatches(hashtable, buf, 0, hashtable->nbatch, "Reuse");
}
ExecHashTableExplainBatches(hashtable, buf, 0, 1, "Initial");
ExecHashTableExplainBatches(hashtable,
buf,
1,
hashtable->nbatch_original,
"Initial");
ExecHashTableExplainBatches(hashtable,
buf,
hashtable->nbatch_original,
hashtable->nbatch_outstart,
"Overflow");
ExecHashTableExplainBatches(hashtable,
buf,
hashtable->nbatch_outstart,
hashtable->nbatch,
"Secondary Overflow");
}
/* Report hash chain statistics. */
......@@ -1609,37 +1583,33 @@ ExecHashTableExplainBatchEnd(HashState *hashState, HashJoinTable hashtable)
batchstats->ordbytes =
ExecWorkFile_Tell64(hashtable->batches[curbatch]->outerside.workfile);
/*
/*
* How much was written to workfiles for the remaining batches?
* In workfile caching, there is no need to write to the remaining batches.
*/
if (!hashtable->hjstate->cached_workfiles_loaded)
for (i = curbatch + 1; i < hashtable->nbatch; i++)
{
for (i = curbatch + 1; i < hashtable->nbatch; i++)
{
HashJoinBatchData *batch = hashtable->batches[i];
HashJoinBatchStats *bs = &stats->batchstats[i];
uint64 filebytes = 0;
if (batch->outerside.workfile != NULL)
filebytes = ExecWorkFile_Tell64(batch->outerside.workfile);
Assert(filebytes >= bs->outerfilesize);
owrbytes += filebytes - bs->outerfilesize;
bs->outerfilesize = filebytes;
filebytes = 0;
if (batch->innerside.workfile)
filebytes = ExecWorkFile_Tell64(batch->innerside.workfile);
Assert(filebytes >= bs->innerfilesize);
iwrbytes += filebytes - bs->innerfilesize;
bs->innerfilesize = filebytes;
}
batchstats->owrbytes = owrbytes;
batchstats->iwrbytes = iwrbytes;
HashJoinBatchData *batch = hashtable->batches[i];
HashJoinBatchStats *bs = &stats->batchstats[i];
uint64 filebytes = 0;
if (batch->outerside.workfile != NULL)
filebytes = ExecWorkFile_Tell64(batch->outerside.workfile);
Assert(filebytes >= bs->outerfilesize);
owrbytes += filebytes - bs->outerfilesize;
bs->outerfilesize = filebytes;
filebytes = 0;
if (batch->innerside.workfile)
filebytes = ExecWorkFile_Tell64(batch->innerside.workfile);
Assert(filebytes >= bs->innerfilesize);
iwrbytes += filebytes - bs->innerfilesize;
bs->innerfilesize = filebytes;
}
batchstats->owrbytes = owrbytes;
batchstats->iwrbytes = iwrbytes;
} /* give workfile I/O statistics */
/* Collect hash chain statistics. */
......
......@@ -37,12 +37,8 @@ static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinBatchSide *side,
TupleTableSlot *tupleSlot);
static int ExecHashJoinNewBatch(HashJoinState *hjstate);
static bool isNotDistinctJoin(List *qualList);
static bool ExecHashJoinLoadBatchFiles(HashJoinTable hashtable);
static void ReleaseHashTable(HashJoinState *node);
static void ExecHashJoinResetWorkfileState(HashJoinState *node);
static void ExecHashJoinSaveState(HashJoinTable hashtable);
static bool isHashtableEmpty(HashJoinTable hashtable);
/* ----------------------------------------------------------------
......@@ -170,31 +166,13 @@ ExecHashJoin(HashJoinState *node)
node->hj_FirstOuterTupleSlot = NULL;
}
workfile_set *work_set = NULL;
if (gp_workfile_caching)
{
Assert(!node->cached_workfiles_batches_buckets_loaded);
Assert(!node->cached_workfiles_loaded);
/* Look for cached workfiles. Mark here if found */
work_set = workfile_mgr_find_set(&node->js.ps);
if (work_set != NULL)
{
elog(gp_workfile_caching_loglevel, "HashJoin found matching existing spill file set");
node->cached_workfiles_found = true;
}
}
/*
* create the hash table
*/
hashtable = ExecHashTableCreate(hashNode,
node,
node->hj_HashOperators,
PlanStateOperatorMemKB((PlanState *) hashNode),
work_set);
PlanStateOperatorMemKB((PlanState *) hashNode));
node->hj_HashTable = hashtable;
/*
......@@ -219,47 +197,12 @@ ExecHashJoin(HashJoinState *node)
* the HashJoin plan when creating the spill file set */
hashtable->hjstate = node;
/* Workfile caching: If possible, load the hashtable state
* from cached workfiles first.
*/
if (gp_workfile_caching && node->cached_workfiles_found)
{
Assert(node->cached_workfiles_batches_buckets_loaded);
Assert(!node->cached_workfiles_loaded);
Assert(hashtable->work_set != NULL);
elog(gp_workfile_caching_loglevel, "In ExecHashJoin, loading hashtable from cached workfiles");
ExecHashJoinLoadBatchFiles(hashtable);
node->cached_workfiles_loaded = true;
elog(gp_workfile_caching_loglevel, "HashJoin reusing cached workfiles, initiating Squelch walker on inner and outer subplans");
ExecSquelchNode(outerNode);
ExecSquelchNode((PlanState *)hashNode);
if (node->js.ps.instrument)
{
node->js.ps.instrument->workfileReused = true;
}
/* Open the first batch and build hashtable from it. */
hashtable->curbatch = -1;
ExecHashJoinNewBatch(node);
#ifdef HJDEBUG
elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples by loading from disk for batch 0",
hashtable->totalTuples);
#endif
}
else
{
/* No cached workfiles found. Execute the Hash node and build the hashtable */
(void) MultiExecProcNode((PlanState *) hashNode);
/* Execute the Hash node and build the hashtable */
(void) MultiExecProcNode((PlanState *) hashNode);
#ifdef HJDEBUG
elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples by executing subplan for batch 0", hashtable->totalTuples);
#endif
}
/**
* If LASJ_NOTIN and a null was found on the inner side, then clean out.
......@@ -361,25 +304,11 @@ ExecHashJoin(HashJoinState *node)
&node->hj_CurBucketNo, &batchno);
node->hj_CurTuple = NULL;
/*
* Save outer tuples for the batch 0 to disk if workfile caching is
* enabled. We do this only when there is spilling.
*/
if (gp_workfile_caching && batchno == 0 && hashtable->nbatch > 1 && !node->cached_workfiles_loaded)
{
Assert(batchno >= node->nbatch_loaded_state);
ExecHashJoinSaveTuple(&node->js.ps, ExecFetchSlotMemTuple(outerTupleSlot, false),
hashvalue,
hashtable,
&hashtable->batches[batchno]->outerside,
hashtable->bfCxt);
}
/*
* Now we've got an outer tuple and the corresponding hash bucket,
* but this tuple may not belong to the current batch.
*/
if (batchno != hashtable->curbatch && !node->cached_workfiles_found)
if (batchno != hashtable->curbatch)
{
/*
* Need to postpone this outer tuple to a later batch. Save it
......@@ -387,7 +316,6 @@ ExecHashJoin(HashJoinState *node)
*/
Assert(batchno != 0);
Assert(batchno > hashtable->curbatch);
Assert(batchno >= node->nbatch_loaded_state);
ExecHashJoinSaveTuple(&node->js.ps, ExecFetchSlotMemTuple(outerTupleSlot, false),
hashvalue,
hashtable,
......@@ -687,8 +615,6 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
hjstate->hj_MatchedOuter = false;
hjstate->hj_OuterNotEmpty = false;
ExecHashJoinResetWorkfileState(hjstate);
initGpmonPktForHashJoin((Plan *)node, &hjstate->js.ps.gpmon_pkt, estate);
return hjstate;
......@@ -769,9 +695,8 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
HashState *hashState = (HashState *) innerPlanState(hjstate);
/* Read tuples from outer relation only if it's the first batch
* and we're not loading from cached workfiles. */
if (curbatch == 0 && !hjstate->cached_workfiles_loaded)
/* Read tuples from outer relation only if it's the first batch */
if (curbatch == 0)
{
for (;;)
{
......@@ -819,16 +744,6 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
*/
} /* for (;;) */
/*
* We have just reached the end of the first pass. Write out the first
* inner batch so that we can reuse it when the workfile caching is
* enabled.
*/
if (gp_workfile_caching)
{
ExecHashJoinSaveFirstInnerBatch(hashtable);
}
/*
* We have just reached the end of the first pass. Try to switch to a
* saved batch.
......@@ -879,14 +794,6 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
CheckSendPlanStateGpmonPkt(&hjstate->js.ps);
}
/* Write spill file state to disk. */
ExecHashJoinSaveState(hashtable);
if (gp_workfile_caching && hjstate->workfiles_created)
{
workfile_mgr_mark_complete(hashtable->work_set);
}
/* Out of batches... */
return NULL;
}
......@@ -934,21 +841,13 @@ start_over:
/*
* We no longer need the previous outer batch file; close it right
* away to free disk space.
*
* However, if workfile caching is enabled, and this is the first
* time to create cached workfiles, we can not close the batch file
* here, since we need to save the workfile names at the end.
*/
if (!(gp_workfile_caching &&
!hjstate->cached_workfiles_found))
batch = hashtable->batches[curbatch];
if (batch->outerside.workfile != NULL)
{
batch = hashtable->batches[curbatch];
if (batch->outerside.workfile != NULL)
{
workfile_mgr_close_file(hashtable->work_set, batch->outerside.workfile);
}
batch->outerside.workfile = NULL;
workfile_mgr_close_file(hashtable->work_set, batch->outerside.workfile);
}
batch->outerside.workfile = NULL;
}
/*
......@@ -1019,16 +918,12 @@ start_over:
if (batch->innerside.workfile != NULL)
{
/* Rewind batch file only if it was created by this operator.
* If we're loading from cached workfiles, no need to rewind. */
if (!hjstate->cached_workfiles_loaded)
/* Rewind batch file */
bool result = ExecWorkFile_Rewind(batch->innerside.workfile);
if (!result)
{
bool result = ExecWorkFile_Rewind(batch->innerside.workfile);
if (!result)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not access temporary file")));
}
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not access temporary file")));
}
for (;;)
......@@ -1056,23 +951,16 @@ start_over:
* after we build the hash table, the inner batch file is no longer
* needed.
*
* However, if workfile caching is enabled, and this is the first
* time to create cached workfiles, we can not close the batch file
* here, since we need to save the workfile names at the end.
*/
if (!(gp_workfile_caching &&
!hjstate->cached_workfiles_found))
{
if (hjstate->js.ps.instrument)
{
Assert(hashtable->stats);
hashtable->stats->batchstats[curbatch].innerfilesize =
ExecWorkFile_Tell64(hashtable->batches[curbatch]->innerside.workfile);
}
workfile_mgr_close_file(hashtable->work_set, batch->innerside.workfile);
batch->innerside.workfile = NULL;
if (hjstate->js.ps.instrument)
{
Assert(hashtable->stats);
hashtable->stats->batchstats[curbatch].innerfilesize =
ExecWorkFile_Tell64(hashtable->batches[curbatch]->innerside.workfile);
}
}
workfile_mgr_close_file(hashtable->work_set, batch->innerside.workfile);
batch->innerside.workfile = NULL;
}
/*
* If there's no outer batch file, advance to next batch.
......@@ -1082,16 +970,12 @@ start_over:
/*
* Rewind outer batch file, so that we can start reading it.
* We only need to do that if we created those files, and not using cached workfiles.
*/
if (!hjstate->cached_workfiles_loaded)
bool result = ExecWorkFile_Rewind(batch->outerside.workfile);
if (!result)
{
bool result = ExecWorkFile_Rewind(batch->outerside.workfile);
if (!result)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not access temporary file")));
}
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not access temporary file")));
}
return curbatch;
......@@ -1322,19 +1206,6 @@ static void ReleaseHashTable(HashJoinState *node)
node->hj_MatchedOuter = false;
node->hj_FirstOuterTupleSlot = NULL;
ExecHashJoinResetWorkfileState(node);
}
/*
* Reset workfile caching state
*/
static void ExecHashJoinResetWorkfileState(HashJoinState *node)
{
node->cached_workfiles_batches_buckets_loaded = false;
node->cached_workfiles_loaded = false;
node->cached_workfiles_found = false;
node->workfiles_created = false;
node->nbatch_loaded_state = -1;
}
/* Is this an IS-NOT-DISTINCT-join qual list (as opposed the an equijoin)?
......@@ -1465,287 +1336,6 @@ ExecHashJoinSaveFirstInnerBatch(HashJoinTable hashtable)
}
}
/* Writing a string to a Workfile.
* Format: [length|string]
* This must be the same format used in ExecHashJoinReadStringStateFile.
* Terminating null character is not written to disk
*/
static bool
WriteStringWorkFile(ExecWorkFile *workfile, const char *str)
{
bool res = false;
size_t slen = strlen(str);
res = ExecWorkFile_Write(workfile, & slen, sizeof(slen));
if (res == false)
{
return false;
}
return(ExecWorkFile_Write(workfile, (void *) str, slen));
}
/*
* Reads a string from a workfile.
* Format: [length|string]
* This must be the same format used in ExecHashJoinWriteStringStateFile.
* Returns the palloc-ed string in the current context, NULL if error occurs.
*/
static char *
ReadStringWorkFile(ExecWorkFile *workfile)
{
size_t slen = 0;
bool res = ExecWorkFile_Read(workfile, & slen, sizeof(slen));
if (res == false)
{
return NULL;
}
char * read_string = palloc(slen+1);
res = ExecWorkFile_Read(workfile, read_string, slen);
if (res == false)
{
pfree(read_string);
return NULL;
}
read_string[slen]='\0';
return read_string;
}
/*
* SaveBatchFileName
* Save the batch file name to the state file, and close the batch file.
*/
static void
SaveBatchFileNameAndClose(HashJoinTable hashtable, ExecWorkFile *workfile)
{
char *batch_file_name = EMPTY_WORKFILE_NAME;
bool free_name = false;
if (workfile != NULL)
{
batch_file_name = pstrdup(ExecWorkFile_GetFileName(workfile));
free_name = true;
workfile_mgr_close_file(hashtable->work_set, workfile);
}
bool res = WriteStringWorkFile(hashtable->state_file, batch_file_name);
if (!res)
{
workfile_mgr_report_error();
}
if (free_name)
{
pfree(batch_file_name);
}
}
/*
* Workfile caching: dump hashtable spill files state to disk after reading
* all inner and outer relation tuples. This can be used to re-load the spill file set
* at a later time.
*/
static void
ExecHashJoinSaveState(HashJoinTable hashtable)
{
/* What do we need to save:
* - nbuckets
* - nbatches
* - names of each file corresponding to each inner batch, in order
* - names of each file corresponding to each inner batch, in order
*/
if (!gp_workfile_caching)
{
return;
}
if (!hashtable->hjstate->workfiles_created)
{
return;
}
/*
* If this is called when a spill set is used, we only need to save
* the spill file state when the number of batches is changed during execution.
*/
if (hashtable->hjstate->cached_workfiles_found &&
hashtable->nbatch == hashtable->nbatch_original)
{
Assert(!hashtable->hjstate->workfiles_created);
return;
}
elog(gp_workfile_caching_loglevel, "Saving HashJoin inner and outer relation spill file state to disk");
bool res = false;
res = ExecWorkFile_Write(hashtable->state_file,
& hashtable->nbuckets, sizeof(hashtable->nbuckets));
if(!res)
{
workfile_mgr_report_error();
}
res = ExecWorkFile_Write(hashtable->state_file,
& hashtable->nbatch, sizeof(hashtable->nbatch));
if(!res)
{
workfile_mgr_report_error();
}
int i;
for (i=0; i < hashtable->nbatch; i++)
{
SaveBatchFileNameAndClose(hashtable,
hashtable->batches[i]->innerside.workfile);
hashtable->batches[i]->innerside.workfile = NULL;
elog(gp_workfile_caching_loglevel, "HashJoin inner batch %d: innerspace=%d, spaceAllowed=%d, innertuples=%d",
i, (int)hashtable->batches[i]->innerspace, (int)hashtable->spaceAllowed, hashtable->batches[i]->innertuples);
SaveBatchFileNameAndClose(hashtable,
hashtable->batches[i]->outerside.workfile);
hashtable->batches[i]->outerside.workfile = NULL;
}
workfile_mgr_close_file(hashtable->work_set, hashtable->state_file);
hashtable->state_file = NULL;
}
/*
* Opens the state workfile from a cached workfile set and reads nbuckets and
* nbatch from it.
*/
bool
ExecHashJoinLoadBucketsBatches(HashJoinTable hashtable)
{
/* What do we need to load:
* - nbuckets
* - nbatches
*/
Assert(hashtable != NULL);
Assert(hashtable->work_set != NULL);
Assert(!hashtable->hjstate->cached_workfiles_batches_buckets_loaded);
/*
* We allocate the workfile data structures in the longer-lived context hashtable->bfCxt.
* This way we can find them and close them at transaction abort, even after hashtable
* went away.
*/
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(hashtable->bfCxt);
hashtable->state_file = workfile_mgr_open_fileno(hashtable->work_set,
WORKFILE_NUM_HASHJOIN_METADATA);
elog(gp_workfile_caching_loglevel, "Loading HashJoin spill state from disk file %s",
ExecWorkFile_GetFileName(hashtable->state_file));
Assert(NULL != hashtable->state_file);
int loaded_nbuckets = 0;
int loaded_nbatch = 0;
uint64 bytes_read = 0;
bytes_read = ExecWorkFile_Read(hashtable->state_file,
& loaded_nbuckets, sizeof(loaded_nbuckets));
insist_log(bytes_read == sizeof(loaded_nbuckets),
"Could not read from temporary work file: %m");
hashtable->nbuckets = loaded_nbuckets;
bytes_read = ExecWorkFile_Read(hashtable->state_file,
& loaded_nbatch, sizeof(loaded_nbatch));
insist_log(bytes_read == sizeof(loaded_nbatch),
"Could not read from temporary work file: %m");
hashtable->nbatch = loaded_nbatch;
hashtable->hjstate->cached_workfiles_batches_buckets_loaded = true;
MemoryContextSwitchTo(oldcxt);
return true;
}
/*
* OpenBatchFile
* Open a batch file that is stored in state_file.
*/
static ExecWorkFile*
OpenBatchFile(HashJoinTable hashtable, int batch_no)
{
ExecWorkFile *workfile = NULL;
/*
* We allocate the workfile data structures in the longer-lived context hashtable->bfCxt.
* This way we can find them and close them at transaction abort, even after hashtable
* went away.
*/
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(hashtable->bfCxt);
char * batch_file_name = ReadStringWorkFile(hashtable->state_file);
insist_log(batch_file_name != NULL, "Could not read from temporary work file: %m");
if (strncmp(batch_file_name, EMPTY_WORKFILE_NAME, sizeof(EMPTY_WORKFILE_NAME)) != 0)
{
workfile = ExecWorkFile_Open(batch_file_name,
hashtable->work_set->metadata.type,
false /* delOnClose */,
hashtable->work_set->metadata.bfz_compress_type);
Assert(NULL != workfile);
elog(gp_workfile_caching_loglevel, "opened for re-use batch file %s for batch #%d",
batch_file_name, batch_no);
}
pfree(batch_file_name);
MemoryContextSwitchTo(oldcxt);
return workfile;
}
static bool
ExecHashJoinLoadBatchFiles(HashJoinTable hashtable)
{
/* We already read:
* - nbuckets
* - nbatches
*
* What do we need to load:
* - names of each file corresponding to each inner batch, in order
* - names of each file corresponding to each outer batch, in order
*/
Assert(hashtable != NULL);
Assert(hashtable->work_set != NULL);
Assert(hashtable->state_file != NULL);
Assert(hashtable->hjstate->cached_workfiles_batches_buckets_loaded);
MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->bfCxt);
for (int i=0; i < hashtable->nbatch; i++)
{
Assert(hashtable->batches[i]->innerside.workfile == NULL);
hashtable->batches[i]->innerside.workfile = OpenBatchFile(hashtable, i);
Assert(hashtable->batches[i]->outerside.workfile == NULL);
hashtable->batches[i]->outerside.workfile = OpenBatchFile(hashtable, i);
}
MemoryContextSwitchTo(oldcxt);
return true;
}
/*
* isHashtableEmpty
*
......
......@@ -101,53 +101,10 @@ ExecMaterial(MaterialState *node)
else
{
/* Non-shared Materialize node */
bool isWriter = true;
workfile_set *work_set = NULL;
workfile_set *work_set = workfile_mgr_create_set(BUFFILE, false /* can_reuse */, &node->ss.ps, NULL_SNAPSHOT);
if (gp_workfile_caching)
{
work_set = workfile_mgr_find_set( &node->ss.ps);
if (NULL != work_set)
{
/* Reusing cached workfiles. Tell subplan we won't be needing any tuples */
elog(gp_workfile_caching_loglevel, "Materialize reusing cached workfiles, initiating Squelch walker");
isWriter = false;
ExecSquelchNode(outerPlanState(node));
node->eof_underlying = true;
node->cached_workfiles_found = true;
if (node->ss.ps.instrument)
{
node->ss.ps.instrument->workfileReused = true;
}
}
}
if (NULL == work_set)
{
/*
* No work_set found, this is because:
* a. workfile caching is enabled but we didn't find any reusable set
* b. workfile caching is disabled
* Creating new empty workset
*/
Assert(!node->cached_workfiles_found);
/* Don't try to cache when running under a ShareInputScan node */
bool can_reuse = (ma->share_type == SHARE_NOTSHARED);
work_set = workfile_mgr_create_set(BUFFILE, can_reuse, &node->ss.ps, NULL_SNAPSHOT);
isWriter = true;
}
Assert(NULL != work_set);
AssertEquivalent(node->cached_workfiles_found, !isWriter);
ts = ntuplestore_create_workset(work_set, node->cached_workfiles_found,
PlanStateOperatorMemKB((PlanState *) node) * 1024);
tsa = ntuplestore_create_accessor(ts, isWriter);
ts = ntuplestore_create_workset(work_set, PlanStateOperatorMemKB((PlanState *) node) * 1024);
tsa = ntuplestore_create_accessor(ts, true /* isWriter */);
}
Assert(ts && tsa);
......@@ -190,13 +147,6 @@ ExecMaterial(MaterialState *node)
if (TupIsNull(outerslot))
{
node->eof_underlying = true;
if (ntuplestore_created_reusable_workfiles(ts))
{
ntuplestore_flush(ts);
ntuplestore_mark_workset_complete(ts);
}
ntuplestore_acc_seek_bof(tsa);
break;
......@@ -284,12 +234,6 @@ ExecMaterial(MaterialState *node)
if (TupIsNull(outerslot))
{
node->eof_underlying = true;
if (ntuplestore_created_reusable_workfiles(ts))
{
ntuplestore_flush(ts);
ntuplestore_mark_workset_complete(ts);
}
if (!node->ss.ps.delayEagerFree)
{
ExecEagerFreeMaterial(node);
......
......@@ -27,7 +27,6 @@
#include "utils/faultinjector.h"
static void ExecSortExplainEnd(PlanState *planstate, struct StringInfoData *buf);
static void ExecSortResetWorkfileState(SortState *node);
/* ----------------------------------------------------------------
* ExecSort
......@@ -54,7 +53,6 @@ ExecSort(SortState *node)
Sort *plannode = NULL;
PlanState *outerNode = NULL;
TupleDesc tupDesc = NULL;
workfile_set *work_set = NULL;
/*
* get state info from node
......@@ -99,17 +97,6 @@ ExecSort(SortState *node)
SO1_printf("ExecSort: %s\n",
"sorting subplan");
if (gp_workfile_caching)
{
/* Look for cached workfile set. Mark here if found */
work_set = workfile_mgr_find_set(&node->ss.ps);
if (work_set != NULL)
{
elog(gp_workfile_caching_loglevel, "Sort found matching cached workfile set");
node->cached_workfiles_found = true;
}
}
/*
* Want to scan subplan in the forward direction while creating the
* sorted data.
......@@ -263,42 +250,7 @@ ExecSort(SortState *node)
}
/*
* Before reading any tuples from below, check if we can re-use
* existing spill files.
* Only mk_sort supports spill file caching.
*/
if (!node->sort_Done && gp_enable_mk_sort && gp_workfile_caching)
{
Assert(tuplesortstate_mk != NULL);
if (node->cached_workfiles_found && !node->cached_workfiles_loaded)
{
Assert(work_set != NULL);
elog(gp_workfile_caching_loglevel, "nodeSort: loading cached workfile metadata");
tuplesort_set_spillfile_set_mk(tuplesortstate_mk, work_set);
tuplesort_read_spill_metadata_mk(tuplesortstate_mk);
node->cached_workfiles_loaded = true;
if (node->ss.ps.instrument)
{
node->ss.ps.instrument->workfileReused = true;
}
/* Loaded sorted data from cached workfile, therefore
* no need to sort anymore!
*/
node->sort_Done = true;
elog(gp_workfile_caching_loglevel, "Sort reusing cached workfiles, initiating Squelch walker");
ExecSquelchNode(outerNode);
}
}
/*
* If first time through and no cached workfiles can be used,
* If first time through,
* read all tuples from outer plan and pass them to
* tuplesort.c. Subsequent calls just fetch tuples from tuplesort.
*/
......@@ -450,7 +402,6 @@ ExecInitSort(Sort *node, EState *estate, int eflags)
sortstate->sort_Done = false;
sortstate->tuplesortstate = palloc0(sizeof(GenericTupStore));
sortstate->share_lk_ctxt = NULL;
ExecSortResetWorkfileState(sortstate);
/* CDB */
......@@ -794,16 +745,5 @@ ExecEagerFreeSort(SortState *node)
}
ExecSortResetWorkfileState(node);
}
}
/*
* Reset workfile caching state
*/
static void
ExecSortResetWorkfileState(SortState *node)
{
node->cached_workfiles_found = false;
node->cached_workfiles_loaded = false;
}
......@@ -675,16 +675,6 @@ struct config_bool ConfigureNamesBool_gp[] =
&gp_workfile_checksumming,
true, NULL, NULL
},
{
{"gp_workfile_caching", PGC_SUSET, QUERY_TUNING_OTHER,
gettext_noop("Enable work file caching"),
gettext_noop("When enabled, work files are persistent "
"and their contents can be reused."),
GUC_GPDB_ADDOPT | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE
},
&gp_workfile_caching,
false, NULL, NULL
},
{
{"force_bitmap_table_scan", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Forces bitmap table scan instead of bitmap heap/ao/aoco scan."),
......
......@@ -1333,22 +1333,6 @@ tuplesort_performsort_mk(Tuplesortstate_mk *state)
state->pos.markpos.tapepos.offset = 0;
state->pos.markpos_eof = false;
/*
* If we're planning to reuse the spill files from this sort,
* save metadata here and mark work_set complete.
*/
if (gp_workfile_caching && state->work_set &&
!QueryFinishPending)
{
tuplesort_write_spill_metadata_mk(state);
/* We don't know how to handle TSS_FINALMERGE yet */
Assert(state->status == TSS_SORTEDONTAPE);
Assert(state->work_set);
workfile_mgr_mark_complete(state->work_set);
}
break;
default:
......@@ -1738,27 +1722,14 @@ inittapes_mk(Tuplesortstate_mk *state, const char* rwfile_prefix)
* inaccurate.)
*/
tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD;
Assert(state->work_set == NULL);
PlanState *ps = NULL;
bool can_be_reused = false;
if (state->ss != NULL)
{
ps = &state->ss->ps;
Sort *node = (Sort *) ps->plan;
if (node->share_type == SHARE_NOTSHARED)
{
/* Only attempt to cache when not shared under a ShareInputScan */
can_be_reused = true;
}
}
/*
* Create the tape set and allocate the per-tape data arrays.
*/
if(!rwfile_prefix)
{
state->work_set = workfile_mgr_create_set(BUFFILE, can_be_reused, ps, NULL_SNAPSHOT);
state->work_set = workfile_mgr_create_set(BUFFILE, false /* can_be_reused */, NULL /* ps */, NULL_SNAPSHOT);
state->tapeset_state_file = workfile_mgr_create_fileno(state->work_set, WORKFILE_NUM_MKSORT_METADATA);
ExecWorkFile *tape_file = workfile_mgr_create_fileno(state->work_set, WORKFILE_NUM_MKSORT_TAPESET);
......@@ -1956,14 +1927,7 @@ mergeruns(Tuplesortstate_mk *state)
* tape, we can stop at this point and do the final merge on-the-fly.
*/
/* If workfile caching is enabled, always do the final merging
* and store the sorted result on disk, instead of stopping before the
* last merge iteration.
* This can cause some slowdown compared to no workfile caching, but
* it enables us to re-use the mechanism to dump and restore logical
* tape set information as-is.
*/
if (!state->randomAccess && !gp_workfile_caching)
if (!state->randomAccess)
{
bool allOneRun = true;
......
......@@ -161,8 +161,6 @@ struct NTupleStore
int rwflag; /* if I am ordinary store, or a reader, or a writer of readerwriter (share input) */
bool cached_workfiles_found; /* true if found matching and usable cached workfiles */
bool cached_workfiles_loaded; /* set after loading cached workfiles */
bool workfiles_created; /* set if the operator created workfiles */
workfile_set *work_set; /* workfile set to use when using workfile manager */
......@@ -668,8 +666,6 @@ ntuplestore_create(int maxBytes)
store->lobbytes = 0;
store->work_set = NULL;
store->cached_workfiles_found = false;
store->cached_workfiles_loaded = false;
store->workfiles_created = false;
Assert(maxBytes >= 0);
......@@ -733,8 +729,6 @@ ntuplestore_create_readerwriter(const char *filename, int maxBytes, bool isWrite
{
store = (NTupleStore *) check_malloc(sizeof(NTupleStore));
store->work_set = NULL;
store->cached_workfiles_found = false;
store->cached_workfiles_loaded = false;
store->workfiles_created = false;
store->pfile = ExecWorkFile_Open(filenameprefix, BUFFILE,
......@@ -803,36 +797,16 @@ ntuplestore_init_reader(NTupleStore *store, int maxBytes)
* The workSet needs to be initialized by the caller.
*/
NTupleStore *
ntuplestore_create_workset(workfile_set *workSet, bool cachedWorkfilesFound, int maxBytes)
ntuplestore_create_workset(workfile_set *workSet, int maxBytes)
{
elog(gp_workfile_caching_loglevel, "Creating tuplestore with workset in directory %s", workSet->path);
NTupleStore *store = ntuplestore_create(maxBytes);
store->work_set = workSet;
store->cached_workfiles_found = cachedWorkfilesFound;
/* Creating new workset */
store->rwflag = NTS_IS_WRITER;
if (store->cached_workfiles_found)
{
Assert(store->work_set != NULL);
/* Reusing existing files. Load data from spill files here */
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
store->pfile = workfile_mgr_open_fileno(store->work_set, WORKFILE_NUM_TUPLESTORE_DATA);
store->plobfile = workfile_mgr_open_fileno(store->work_set, WORKFILE_NUM_TUPLESTORE_LOB);
MemoryContextSwitchTo(oldcxt);
ntuplestore_init_reader(store, maxBytes);
store->cached_workfiles_loaded = true;
}
else
{
/* Creating new workset */
store->rwflag = NTS_IS_WRITER;
}
return store;
}
......@@ -1582,7 +1556,6 @@ static void
ntuplestore_create_spill_files(NTupleStore *nts)
{
Assert(nts->work_set != NULL);
Assert(!nts->cached_workfiles_found);
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
......@@ -1594,17 +1567,7 @@ ntuplestore_create_spill_files(NTupleStore *nts)
}
/*
* Returns true if this tuplestore created workfiles that can potentially be
* reused by other queries.
*/
bool
ntuplestore_created_reusable_workfiles(NTupleStore *nts)
{
Assert(nts);
return gp_workfile_caching && nts->workfiles_created && nts->work_set && nts->work_set->can_be_reused;
}
/*
* Mark the associated workfile set as complete, allowing it to be cached for reuse.
* Mark the associated workfile set as complete
*/
void
ntuplestore_mark_workset_complete(NTupleStore *nts)
......
......@@ -47,11 +47,10 @@ workfile_mgr_create_fileno(workfile_set *work_set, uint32 file_no)
char file_name[MAXPGPATH];
retrieve_file_no(work_set, file_no, file_name, sizeof(file_name));
bool del_on_close = !work_set->can_be_reused;
ExecWorkFile *ewfile = ExecWorkFile_Create(file_name,
work_set->metadata.type,
del_on_close,
true /* del_on_close */,
work_set->metadata.bfz_compress_type);
ExecWorkfile_SetWorkset(ewfile, work_set);
......@@ -72,11 +71,9 @@ workfile_mgr_open_fileno(workfile_set *work_set, uint32 file_no)
char file_name[MAXPGPATH];
retrieve_file_no(work_set, file_no, file_name, sizeof(file_name));
bool del_on_close = !work_set->can_be_reused;
ExecWorkFile *ewfile = ExecWorkFile_Open(file_name,
work_set->metadata.type,
del_on_close,
true /* del_on_close */,
work_set->metadata.bfz_compress_type);
ExecWorkfile_SetWorkset(ewfile, work_set);
......
......@@ -41,7 +41,6 @@ typedef struct workset_info
uint64 operator_work_mem;
char *dir_path;
bool can_be_reused;
bool on_disk;
} workset_info;
/* Forward declarations */
......@@ -204,8 +203,6 @@ workfile_mgr_create_set(enum ExecWorkFileType type, bool can_be_reused, PlanStat
set_info.dir_path = dir_path;
set_info.session_start_time = GetCurrentTimestamp();
set_info.operator_work_mem = get_operator_work_mem(ps);
set_info.on_disk = true;
CacheEntry *newEntry = NULL;
PG_TRY();
......@@ -227,19 +224,9 @@ workfile_mgr_create_set(enum ExecWorkFileType type, bool can_be_reused, PlanStat
Assert(NULL != newEntry);
workfile_set *work_set = CACHE_ENTRY_PAYLOAD(newEntry);
Assert(work_set != NULL);
if (work_set->can_be_reused)
{
Assert(plan != NULL);
Assert(nodeTag(plan) >= T_Plan && nodeTag(plan) < T_PlanInvalItem);
workfile_set_plan *s_plan = workfile_mgr_serialize_plan(ps);
work_set->key = workfile_mgr_hash_key(s_plan);
workfile_mgr_save_plan(work_set, s_plan);
workfile_mgr_free_plan(s_plan);
}
elog(gp_workfile_caching_loglevel, "new spill file set. key=0x%x can_be_reused=%d prefix=%s opMemKB=" INT64_FORMAT,
work_set->key, work_set->can_be_reused, work_set->path, work_set->metadata.operator_work_mem);
elog(gp_workfile_caching_loglevel, "new spill file set. key=0x%x prefix=%s opMemKB=" INT64_FORMAT,
work_set->key, work_set->path, work_set->metadata.operator_work_mem);
return work_set;
}
......@@ -344,36 +331,22 @@ workfile_mgr_populate_set(const void *resource, const void *param)
work_set->metadata.operator_work_mem = set_info->operator_work_mem;
work_set->set_plan = NULL;
if (!set_info->on_disk)
{
/* This is for a "virtual" work_set, used for look-ups. No need to populate further */
Assert(NULL == set_info->dir_path);
work_set->on_disk = false;
}
else
{
work_set->complete = false;
work_set->no_files = 0;
work_set->size = 0L;
work_set->in_progress_size = 0L;
work_set->node_type = set_info->nodeType;
work_set->metadata.type = set_info->file_type;
work_set->metadata.bfz_compress_type = gp_workfile_compress_algorithm;
work_set->metadata.snapshot = set_info->snapshot;
work_set->metadata.num_leaf_files = 0;
work_set->slice_id = currentSliceId;
work_set->session_id = gp_session_id;
work_set->command_count = gp_command_count;
work_set->session_start_time = set_info->session_start_time;
/* If workfile caching is disabled, nothing should be re-used, so override whatever the caller says */
work_set->can_be_reused = gp_workfile_caching && set_info->can_be_reused;
Assert(strlen(set_info->dir_path) < MAXPGPATH);
strncpy(work_set->path, set_info->dir_path, MAXPGPATH);
work_set->on_disk = true;
}
work_set->complete = false;
work_set->no_files = 0;
work_set->size = 0L;
work_set->in_progress_size = 0L;
work_set->node_type = set_info->nodeType;
work_set->metadata.type = set_info->file_type;
work_set->metadata.bfz_compress_type = gp_workfile_compress_algorithm;
work_set->metadata.snapshot = set_info->snapshot;
work_set->metadata.num_leaf_files = 0;
work_set->slice_id = currentSliceId;
work_set->session_id = gp_session_id;
work_set->command_count = gp_command_count;
work_set->session_start_time = set_info->session_start_time;
Assert(strlen(set_info->dir_path) < MAXPGPATH);
strncpy(work_set->path, set_info->dir_path, MAXPGPATH);
}
/*
......@@ -595,7 +568,6 @@ workfile_mgr_lookup_set(PlanState *ps)
workset_info set_info;
set_info.dir_path = NULL;
set_info.operator_work_mem = get_operator_work_mem(ps);
set_info.on_disk = false;
CacheEntry *localEntry = acquire_entry_retry(workfile_mgr_cache, &set_info);
Assert(localEntry != NULL);
......@@ -757,54 +729,38 @@ workfile_mgr_cleanup_set(const void *resource)
{
workfile_set *work_set = (workfile_set *) resource;
if (work_set->on_disk)
{
ereport(gp_workfile_caching_loglevel,
(errmsg("workfile mgr cleanup deleting set: key=0x%0xd, size=" INT64_FORMAT
" in_progress_size=" INT64_FORMAT " path=%s",
work_set->key,
work_set->size,
work_set->in_progress_size,
work_set->path),
errprintstack(true)));
ereport(gp_workfile_caching_loglevel,
(errmsg("workfile mgr cleanup deleting set: key=0x%0xd, size=" INT64_FORMAT
" in_progress_size=" INT64_FORMAT " path=%s",
work_set->key,
work_set->size,
work_set->in_progress_size,
work_set->path),
errprintstack(true)));
Assert(NULL == work_set->set_plan);
Assert(NULL == work_set->set_plan);
workfile_mgr_delete_set_directory(work_set->path);
workfile_mgr_delete_set_directory(work_set->path);
/*
* The most accurate size of a workset is recorded in work_set->in_progress_size.
* work_set->size is only updated when we close a file, so it lags behind
*/
Assert(work_set->in_progress_size >= work_set->size);
int64 size_to_delete = work_set->in_progress_size;
/*
* The most accurate size of a workset is recorded in work_set->in_progress_size.
* work_set->size is only updated when we close a file, so it lags behind
*/
elog(gp_workfile_caching_loglevel, "Subtracting " INT64_FORMAT " from workfile diskspace", size_to_delete);
Assert(work_set->in_progress_size >= work_set->size);
int64 size_to_delete = work_set->in_progress_size;
/*
* When subtracting the size of this workset from our accounting,
* only update the per-query counter if we created the workset.
* In that case, the state is ACQUIRED, otherwise is CACHED or DELETED
*/
CacheEntry *cacheEntry = CACHE_ENTRY_HEADER(resource);
bool update_query_space = (cacheEntry->state == CACHE_ENTRY_ACQUIRED);
elog(gp_workfile_caching_loglevel, "Subtracting " INT64_FORMAT " from workfile diskspace", size_to_delete);
WorkfileDiskspace_Commit(0, size_to_delete, update_query_space);
}
else
{
/* Non-physical workfile set, we need to free up the plan memory */
if (NULL != work_set->set_plan->serialized_plan)
{
pfree(work_set->set_plan->serialized_plan);
}
/*
* When subtracting the size of this workset from our accounting,
* only update the per-query counter if we created the workset.
* In that case, the state is ACQUIRED, otherwise is CACHED or DELETED
*/
CacheEntry *cacheEntry = CACHE_ENTRY_HEADER(resource);
bool update_query_space = (cacheEntry->state == CACHE_ENTRY_ACQUIRED);
if (NULL != work_set->set_plan)
{
pfree(work_set->set_plan);
}
}
WorkfileDiskspace_Commit(0, size_to_delete, update_query_space);
}
/*
......@@ -816,35 +772,12 @@ workfile_mgr_close_set(workfile_set *work_set)
{
Assert(work_set!=NULL);
elog(gp_workfile_caching_loglevel, "closing workfile set: can_be_reused=%d complete=%d location: %s, size=" INT64_FORMAT
elog(gp_workfile_caching_loglevel, "closing workfile set: complete=%d location: %s, size=" INT64_FORMAT
" in_progress_size=" INT64_FORMAT,
work_set->can_be_reused, work_set->complete, work_set->path,
work_set->complete, work_set->path,
work_set->size, work_set->in_progress_size);
CacheEntry *cache_entry = CACHE_ENTRY_HEADER(work_set);
if (Cache_IsCached(cache_entry))
{
/* Workset came from cache. Just release it, nothing to do */
Cache_Release(workfile_mgr_cache, cache_entry);
return;
}
if (work_set->complete && work_set->can_be_reused)
{
cache_entry->size = work_set->size;
/* We want to keep this one around. Insert into cache */
Cache_Insert(workfile_mgr_cache, cache_entry);
Cache_Release(workfile_mgr_cache, cache_entry);
return;
}
/*
* Fall-through case: We need to delete this work_set, as it's not reusable.
*/
Assert(!work_set->complete || !work_set->can_be_reused);
Cache_Release(workfile_mgr_cache, cache_entry);
}
......@@ -989,7 +922,6 @@ workfile_set_equivalent(const void *virtual_resource, const void *physical_resou
return false;
}
Assert(!virtual_workset->on_disk && physical_workset->on_disk && "comparing two physical or two virtual worksets not supported");
Assert(NULL != virtual_workset->set_plan);
return workfile_mgr_compare_plan(physical_workset, virtual_workset->set_plan);
......
......@@ -203,18 +203,14 @@ gp_workfile_mgr_cache_entries(PG_FUNCTION_ARGS)
*/
Cache_LockEntry(cache, crtEntry);
if (!should_list_entry(crtEntry) || !work_set->on_disk)
if (!should_list_entry(crtEntry))
{
Cache_UnlockEntry(cache, crtEntry);
continue;
}
values[0] = Int32GetDatum(Gp_segment);
if (work_set->on_disk)
{
/* Only physical sets have a meaningful path */
strncpy(work_set_path, work_set->path, MAXPGPATH);
}
strncpy(work_set_path, work_set->path, MAXPGPATH);
values[2] = UInt32GetDatum(crtEntry->hashvalue);
......
......@@ -1066,7 +1066,6 @@ extern const char *gpvars_show_gp_gpperfmon_log_alert_level(void);
extern int gp_hashagg_compress_spill_files;
extern int gp_workfile_compress_algorithm;
extern bool gp_workfile_checksumming;
extern bool gp_workfile_caching;
extern double gp_workfile_limit_per_segment;
extern double gp_workfile_limit_per_query;
extern int gp_workfile_limit_files_per_query;
......
......@@ -224,9 +224,7 @@ extern bool agg_hash_stream(AggState *aggstate);
extern bool agg_hash_next_pass(AggState *aggstate);
extern bool agg_hash_continue_pass(AggState *aggstate);
extern void destroy_agg_hash_table(AggState *aggstate);
extern void agg_hash_reset_workfile_state(AggState *aggstate);
extern void agg_hash_mark_spillset_complete(AggState *aggstate);
extern void agg_hash_close_state_file(HashAggTable *hashtable);
extern HashAggEntry *agg_hash_iter(AggState *aggstate);
......
......@@ -26,7 +26,7 @@ extern Node *MultiExecHash(HashState *node);
extern void ExecEndHash(HashState *node);
extern void ExecReScanHash(HashState *node, ExprContext *exprCtxt);
extern HashJoinTable ExecHashTableCreate(HashState *hashState, HashJoinState *hjstate, List *hashOperators, uint64 operatorMemKB, workfile_set * sfs);
extern HashJoinTable ExecHashTableCreate(HashState *hashState, HashJoinState *hjstate, List *hashOperators, uint64 operatorMemKB);
extern void ExecHashTableDestroy(HashState *hashState, HashJoinTable hashtable);
extern void ExecHashTableInsert(HashState *hashState, HashJoinTable hashtable,
struct TupleTableSlot *slot,
......
......@@ -32,7 +32,6 @@ extern void ExecHashJoinSaveTuple(PlanState *ps, MemTuple tuple, uint32 hashvalu
extern void ExecEagerFreeHashJoin(HashJoinState *node);
extern void ExecHashJoinSaveFirstInnerBatch(HashJoinTable hashtable);
extern bool ExecHashJoinLoadBucketsBatches(HashJoinTable hashtable);
enum
{
......
......@@ -2139,16 +2139,8 @@ typedef struct HashJoinState
bool prefetch_inner;
bool hj_nonequijoin;
/* true if found matching and usable cached workfiles */
bool cached_workfiles_found;
/* set after loading nbatch and nbuckets from cached workfile */
bool cached_workfiles_batches_buckets_loaded;
/* set after loading cached workfiles */
bool cached_workfiles_loaded;
/* set if the operator created workfiles */
bool workfiles_created;
/* number of batches when we loaded from the state. -1 means not loaded yet */
int nbatch_loaded_state;
} HashJoinState;
......@@ -2240,8 +2232,6 @@ typedef struct SortState
void *share_lk_ctxt;
bool cached_workfiles_found; /* true if found matching and usable cached workfiles */
bool cached_workfiles_loaded; /* set after loading cached workfiles */
} SortState;
/* ---------------------
......@@ -2301,10 +2291,6 @@ typedef struct AggState
bool *doReplace;
List *percs; /* all PercentileExpr nodes in targetlist & quals */
/* true if found matching and usable cached workfiles */
bool cached_workfiles_found;
/* set after loading cached workfiles */
bool cached_workfiles_loaded;
/* set if the operator created workfiles */
bool workfiles_created;
......
......@@ -139,9 +139,6 @@ extern void tuplesort_finalize_stats_mk(Tuplesortstate_mk *state);
extern int tuplesort_merge_order(long allowedMem);
extern void tuplesort_write_spill_metadata_mk(Tuplesortstate_mk *state);
extern void tuplesort_read_spill_metadata_mk(Tuplesortstate_mk *state);
extern void tuplesort_set_spillfile_set_mk(Tuplesortstate_mk * state, workfile_set * sfs);
/*
* These routines may only be called if randomAccess was specified 'true'.
......
......@@ -27,7 +27,7 @@ void ntuplestore_setinstrument(NTupleStore* ts, struct Instrumentation *ins);
/* Tuple store method */
extern NTupleStore *ntuplestore_create(int maxBytes);
extern NTupleStore *ntuplestore_create_readerwriter(const char* filename, int maxBytes, bool isWriter);
extern NTupleStore *ntuplestore_create_workset(workfile_set *workSet, bool cachedWorkfilesFound, int maxBytes);
extern NTupleStore *ntuplestore_create_workset(workfile_set *workSet, int maxBytes);
extern bool ntuplestore_is_readerwriter_reader(NTupleStore* nts);
extern bool ntuplestore_is_readerwriter_writer(NTupleStore* nts);
extern bool ntuplestore_is_readerwriter(NTupleStore* nts);
......
......@@ -120,15 +120,9 @@ typedef struct workfile_set
/* Operator-specific metadata */
workfile_set_op_metadata metadata;
/* Set to true for workfile sets that are associated with physical files */
bool on_disk;
/* For non-physical workfile sets, pointer to the serialized plan */
workfile_set_plan *set_plan;
/* Indicates if this set can be reused */
bool can_be_reused;
/* Set to true during operator execution once set is complete */
bool complete;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册