提交 957c4e50 编写于 作者: H Heikki Linnakangas

Replace NTupleStore with upstream tuplestore in ShareInputScans.

To make that possible, add functions to tuplestore.c to share the store
across processes, similar to how NTupleStore can be shared.
Reviewed-by: NHubert Zhang <hzhang@pivotal.io>
上级 d3809fcc
......@@ -66,7 +66,7 @@
#include "utils/faultinjector.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/tuplestorenew.h"
#include "utils/tuplestore.h"
/*
* In a cross-slice ShareinputScan, the producer and consumer processes
......@@ -156,7 +156,7 @@ typedef struct shareinput_local_state
PlanState *childState;
/* Tuplestore that holds the result */
NTupleStore *ts_state;
Tuplestorestate *ts_state;
} shareinput_local_state;
static shareinput_Xslice_reference *get_shareinput_reference(int share_id);
......@@ -171,6 +171,8 @@ static void shareinput_reader_waitready(shareinput_Xslice_reference *ref);
static void shareinput_reader_notifydone(shareinput_Xslice_reference *ref, int nconsumers);
static void shareinput_writer_waitdone(shareinput_Xslice_reference *ref, int nconsumers);
static void ExecShareInputScanExplainEnd(PlanState *planstate, struct StringInfoData *buf);
/*
* init_tuplestore_state
......@@ -183,13 +185,13 @@ init_tuplestore_state(ShareInputScanState *node)
EState *estate = node->ss.ps.state;
ShareInputScan *sisc = (ShareInputScan *) node->ss.ps.plan;
shareinput_local_state *local_state = node->local_state;
NTupleStore *ts;
NTupleStoreAccessor *tsa;
Tuplestorestate *ts;
int tsptrno;
TupleTableSlot *outerslot;
Assert(!node->isready);
Assert(node->ts_state == NULL);
Assert(node->ts_pos == NULL);
Assert(node->ts_pos == -1);
if (sisc->cross_slice)
{
......@@ -208,15 +210,41 @@ init_tuplestore_state(ShareInputScanState *node)
elog(DEBUG1, "SISC writer (shareid=%d, slice=%d): No tuplestore yet, creating tuplestore",
sisc->share_id, currentSliceId);
ts = tuplestore_begin_heap(true, /* randomAccess */
false, /* interXact */
10); /* maxKBytes FIXME */
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id);
ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true);
tsa = ntuplestore_create_accessor(ts, true);
tuplestore_make_shared(ts, rwfile_prefix);
}
else
{
/* intra-slice */
ts = ntuplestore_create(PlanStateOperatorMemKB((PlanState *) node) * 1024, "Materialize");
tsa = ntuplestore_create_accessor(ts, true /* isWriter */);
ts = tuplestore_begin_heap(true, /* randomAccess */
false, /* interXact */
PlanStateOperatorMemKB((PlanState *) node));
/*
* Offer extra memory usage info for EXPLAIN ANALYZE.
*
* If this is a cross-slice share, the tuplestore uses very
* little memory, because it has to materialize the result on
* a file anyway, so that it can be shared across processes.
* In that case, reporting memory usage doesn't make much
* sense. The "work_mem wanted" value would particularly
* non-sensical, as we we would write to a file regardless of
* work_mem. So only track memory usage in the non-cross-slice
* case.
*/
if (node->ss.ps.instrument && node->ss.ps.instrument->need_cdb)
{
/* Let the tuplestore share our Instrumentation object. */
tuplestore_set_instrument(ts, node->ss.ps.instrument);
/* Request a callback at end of query. */
node->ss.ps.cdbexplainfun = ExecShareInputScanExplainEnd;
}
}
for (;;)
......@@ -224,14 +252,16 @@ init_tuplestore_state(ShareInputScanState *node)
outerslot = ExecProcNode(local_state->childState);
if (TupIsNull(outerslot))
break;
ntuplestore_acc_put_tupleslot(tsa, outerslot);
tuplestore_puttupleslot(ts, outerslot);
}
if (sisc->cross_slice)
{
ntuplestore_flush(ts);
tuplestore_freeze(ts);
shareinput_writer_notifyready(node->ref);
}
tuplestore_rescan(ts);
}
else
{
......@@ -246,29 +276,24 @@ init_tuplestore_state(ShareInputScanState *node)
shareinput_reader_waitready(node->ref);
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id);
ts = ntuplestore_create_readerwriter(rwfile_prefix, 0, false);
tsa = ntuplestore_create_accessor(ts, false);
ts = tuplestore_open_shared(rwfile_prefix, false /* interXact */);
}
local_state->ts_state = ts;
local_state->ready = true;
tsptrno = 0;
}
else
{
/* Another local reader */
ts = local_state->ts_state;
tsa = (void *) ntuplestore_create_accessor(ts, false);
}
tsptrno = tuplestore_alloc_read_pointer(ts, (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND));
/* Offer extra info for EXPLAIN ANALYZE. */
if (node->ss.ps.instrument && node->ss.ps.instrument->need_cdb)
{
/* Let the tuplestore share our Instrumentation object. */
ntuplestore_setinstrument(ts, node->ss.ps.instrument);
tuplestore_select_read_pointer(ts, tsptrno);
tuplestore_rescan(ts);
}
ntuplestore_acc_seek_bof(tsa);
node->ts_state = ts;
node->ts_pos = tsa;
node->ts_pos = tsptrno;
node->isready = true;
}
......@@ -306,12 +331,12 @@ ExecShareInputScan(ShareInputScanState *node)
Assert(!node->local_state->closed);
tuplestore_select_read_pointer(node->ts_state, node->ts_pos);
while(1)
{
bool gotOK;
ntuplestore_acc_advance(node->ts_pos, forward ? 1 : -1);
gotOK = ntuplestore_acc_current_tupleslot(node->ts_pos, slot);
gotOK = tuplestore_gettupleslot(node->ts_state, forward, false, slot);
if (!gotOK)
return NULL;
......@@ -345,7 +370,7 @@ ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags)
sisstate->ss.ps.state = estate;
sisstate->ts_state = NULL;
sisstate->ts_pos = NULL;
sisstate->ts_pos = -1;
/*
* init child node.
......@@ -436,6 +461,30 @@ ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags)
return sisstate;
}
/*
* ExecShareInputScanExplainEnd
* Called before ExecutorEnd to finish EXPLAIN ANALYZE reporting.
*
* Some of the cleanup that ordinarily would occur during ExecEndShareInputScan()
* needs to be done earlier in order to report statistics to EXPLAIN ANALYZE.
* Note that ExecEndShareInputScan() will still be during ExecutorEnd().
*/
static void
ExecShareInputScanExplainEnd(PlanState *planstate, struct StringInfoData *buf)
{
ShareInputScan *sisc = (ShareInputScan *) planstate->plan;
shareinput_local_state *local_state = ((ShareInputScanState *) planstate)->local_state;
/*
* Release tuplestore resources
*/
if (!sisc->cross_slice && local_state && local_state->ts_state)
{
tuplestore_end(local_state->ts_state);
local_state->ts_state = NULL;
}
}
/* ------------------------------------------------------------------
* ExecEndShareInputScan
* ------------------------------------------------------------------
......@@ -479,11 +528,9 @@ ExecEndShareInputScan(ShareInputScanState *node)
node->ref = NULL;
}
/* Don't free the accessors, they will go away with the NTupleStore instance. */
if (local_state && local_state->ts_state)
{
ntuplestore_destroy(local_state->ts_state);
tuplestore_end(local_state->ts_state);
local_state->ts_state = NULL;
}
......@@ -507,9 +554,10 @@ ExecReScanShareInputScan(ShareInputScanState *node)
init_tuplestore_state(node);
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
Assert(NULL != node->ts_pos);
Assert(node->ts_pos != -1);
ntuplestore_acc_seek_bof(node->ts_pos);
tuplestore_select_read_pointer(node->ts_state, node->ts_pos);
tuplestore_rescan(node->ts_state);
}
/*
......
......@@ -43,6 +43,27 @@
* before switching to the other state or activating a different read pointer.
*
*
* Greenplum changes
* -----------------
*
* In Greenplum, tuplestores have one extra capability: a tuplestore can
* be created and filled in one process, and opened for reading in another
* process. To do this, call tuplestore_make_shared() immediately
* after creating the tuplestore, in the writer process. Then populate the
* tuplestore as usual, by calling tuplestore_puttupleslot(). When you're
* finished writing to it, call tuplestore_freeze(). tuplestore_freeze()
* flushes all the tuples to the file. No new rows may be added after
* freezing it.
*
* After freezing, you can open the tupletore for reading in the other
* process by calling tuplestore_open_shared(). It may be opened for reading
* as many times as you want, in different processes, until it is destroyed
* by the original writer process by calling tuplestore_end().
*
* Note that tuplestore doesn't do any synchronization across processes!
* It is up to the calling code to do the freezing, opening for reading, and
* destroying the tuplestore in the right order!
*
* Portions Copyright (c) 2007-2010, Greenplum Inc.
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
......@@ -65,10 +86,11 @@
#include "storage/buffile.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/tuplestore.h"
#include "cdb/cdbvars.h"
#include "access/memtup.h"
#include "executor/instrument.h" /* struct Instrumentation */
#include "utils/workfile_mgr.h"
/*
* Possible states of a Tuplestore object. These denote the states that
......@@ -118,6 +140,10 @@ struct Tuplestorestate
MemoryContext context; /* memory context for holding tuples */
ResourceOwner resowner; /* resowner for holding temp files */
bool frozen;
bool shared;
workfile_set *work_set; /* workfile set to use when using workfile manager */
/*
* These function pointers decouple the routines that must know what kind
* of tuple we are handling from the routines that don't need to know it.
......@@ -506,6 +532,8 @@ tuplestore_end(Tuplestorestate *state)
if (state->myfile)
BufFileClose(state->myfile);
if (state->work_set)
workfile_mgr_close_set(state->work_set);
if (state->memtuples)
{
for (i = state->memtupdeleted; i < state->memtupcount; i++)
......@@ -814,6 +842,9 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
int i;
ResourceOwner oldowner;
if (state->frozen)
elog(ERROR, "cannot write new tuples to frozen tuplestore");
switch (state->status)
{
case TSS_INMEM:
......@@ -1432,6 +1463,10 @@ tuplestore_trim(Tuplestorestate *state)
if (state->eflags & EXEC_FLAG_REWIND)
return;
/* Cannot trim tuplestore if another process might be reading it */
if (state->frozen)
return;
/*
* We don't bother trimming temp files since it usually would mean more
* work than just letting them sit in kernel buffers until they age out.
......@@ -1687,3 +1722,102 @@ tuplestore_set_instrument(Tuplestorestate *state,
state->instrument = instrument;
} /* tuplestore_set_instrument */
/* Extra GPDB functions for sharing tuplestores across processes */
/*
* tuplestore_make_shared
*
* Make a tuplestore available for sharing later. This must be called
* immediately after tuplestore_begin_heap().
*/
void
tuplestore_make_shared(Tuplestorestate *state, const char *filename)
{
ResourceOwner oldowner;
state->work_set = workfile_mgr_create_set("SharedTupleStore", filename);
state->shared = true;
/*
* Switch to tape-based operation, like in tuplestore_puttuple_common().
* We could delay this until tuplestore_freeze(), but we know we'll have
* to write everything to the file anyway, so let's not waste memory
* buffering the tuples in the meanwhile.
*/
PrepareTempTablespaces();
/* associate the file with the store's resource owner */
oldowner = CurrentResourceOwner;
CurrentResourceOwner = state->resowner;
state->myfile = BufFileCreateNamedTemp(filename,
state->interXact,
state->work_set);
CurrentResourceOwner = oldowner;
/*
* For now, be conservative and always use trailing length words for
* cross-process tuplestores. It's important that the writer and the
* reader processes agree on this, and forcing it to true is the
* simplest way to achieve that.
*/
state->backward = true;
state->status = TSS_WRITEFILE;
}
static void
writetup_forbidden(Tuplestorestate *state, void *tup)
{
elog(ERROR, "cannot write to tuplestore, it is already frozen");
}
/*
* tuplestore_freeze
*
* Flush the current buffer to disk, and forbid further inserts. This
* prepares the tuplestore for reading from a different process.
*/
void
tuplestore_freeze(Tuplestorestate *state)
{
Assert(state->shared);
dumptuples(state);
BufFileFlush(state->myfile);
state->frozen = true;
}
/*
* tuplestore_open_shared
*
* Open a shared tuplestore that has been populated in another process
* for reading.
*/
Tuplestorestate *
tuplestore_open_shared(const char *filename, bool interXact)
{
Tuplestorestate *state;
int eflags;
eflags = EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND;
state = tuplestore_begin_common(eflags, interXact,
10 /* no need for memory buffers */);
state->backward = true;
state->copytup = copytup_heap;
state->writetup = writetup_forbidden;
state->readtup = readtup_heap;
state->mt_bind = NULL;
state->myfile = BufFileOpenNamedTemp(filename, interXact);
state->readptrs[0].file = 0;
state->readptrs[0].offset = 0L;
state->status = TSS_READFILE;
return state;
}
......@@ -2480,8 +2480,8 @@ typedef struct ShareInputScanState
{
ScanState ss;
struct NTupleStore *ts_state;
struct NTupleStoreAccessor *ts_pos;
Tuplestorestate *ts_state;
int ts_pos;
struct shareinput_local_state *local_state;
struct shareinput_Xslice_reference *ref;
......
......@@ -90,4 +90,8 @@ extern void tuplestore_end(Tuplestorestate *state);
extern void tuplestore_set_instrument(Tuplestorestate *state,
struct Instrumentation *instrument);
extern void tuplestore_make_shared(Tuplestorestate *state, const char *filename);
extern void tuplestore_freeze(Tuplestorestate *state);
extern Tuplestorestate *tuplestore_open_shared(const char *filename, bool interXact);
#endif /* TUPLESTORE_H */
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册