提交 abe13c79 编写于 作者: A Asim R P

Use block number instead of LSN to batch changed blocks in filerep

Filerep resync logic to fetch changed blocks from changetracking (CT)
log is changed.  LSN is no longer used to filter out blocks from CT
log.  If a relation's changed blocks falls above the threshold number
of blocks that can be fetched at a time, the last fetched block number
is remembered and used to form subsequent batch.
上级 8e59eea3
......@@ -1649,8 +1649,18 @@ FileRepResync_CheckProgress(void)
return countProgress;
}
/*
*
/*
* This function returns either of two output parameters. For relations that
* can be resynchronized using changed blocks recorded in CT log, "request"
* object is created and returned as output parameter. These are the relations
* whose mirrorDataSynchronizationState in persistent table is PageIncremental.
* E.g. regular heap tables on which insert/update/delete was run while primary
* was in changetracking. Second category of relations are appendonly tables
* or heap tables whose mirrorDataSynchronizationState is ScanIncremental.
* These are persistent tables and heap tables that were truncated when primary
* was in changetracking. Changed blocks recorded in CT log are NOT used for
* resynchronizing these tables. The CT hash entry is directly returned in
* this case.
*/
FileRepResyncHashEntry_s*
FileRepPrimary_GetResyncEntry(ChangeTrackingRequest **request)
......@@ -1681,7 +1691,7 @@ FileRepPrimary_GetResyncEntry(ChangeTrackingRequest **request)
{
changedPageCount += entry->mirrorBufpoolResyncChangedPageCount;
if (changedPageCount > gp_filerep_ct_batch_size )
if (changedPageCount > gp_filerep_ct_batch_size)
{
if (NumberOfRelations == 0)
{
......@@ -1726,8 +1736,7 @@ FileRepPrimary_GetResyncEntry(ChangeTrackingRequest **request)
if (NumberOfRelations > 0)
{
int count = 0;
XLogRecPtr endResyncLSN = FileRepResync_GetEndIncrResyncLSN();
requestLocal = ChangeTracking_FormRequest(NumberOfRelations);
hash_seq_init(&hash_status, fileRepResyncShmem->fileRepResyncHash);
......@@ -1744,26 +1753,30 @@ FileRepPrimary_GetResyncEntry(ChangeTrackingRequest **request)
{
/*
* When a single SN has more than 64K, it must be called in GetChanges by itself
* (without other relfilenodes). The GetChanges() routine will return the first 64K changes,
* plus an indication that GetChanges() did not finish with this relation. When the consumer
* sees this “not done yet” flag, he needs to call GetChanges() for this relfilenode again
* (when ready) and pass the end lsn from the previous call as the beginning lsn for this call.
* GetChanges() will then return the next 64K changes, etc, etc.
* When a single relation has more changed blocks than
* gp_filerep_ct_batch_size, it must be called in
* ChangeTracking_GetChanges() by itself (with no other
* relation). The GetChanges() routine will return the
* first batch of changes, plus an indication that
* GetChanges() did not finish with this relation
* (ask_for_more flag). The caller must invoke
* GetChanges() for this relation again (when ready) and
* pass the last block number from the previous call as the
* beginning block number for this call. GetChanges() will
* then return the next batch of changes and this will
* continue until ask_for_more flag is returned as false.
*
* If GetChanges() was called with more than 1 relfilenode/SN at a time AND it sees more than
* 64K changes it will be an internal error.
* If GetChanges() was called with more than 1 relation
* (persistent serial number) at a time AND it sees more
* than gp_filerep_ct_batch_size changes it will be an
* internal error.
*/
if (entry->mirrorBufpoolResyncChangedPageCount > gp_filerep_ct_batch_size)
{
Assert(NumberOfRelations == 1);
}
ChangeTracking_AddRequestEntry(requestLocal,
entry->relFileNode,
&entry->mirrorBufpoolResyncCkptLoc, //beginIncrResyncLSN,
&endResyncLSN);
ChangeTracking_AddRequestEntry(requestLocal, entry->relFileNode);
found = TRUE;
fileRepResyncShmem->writeCount--;
fileRepResyncShmem->resyncInProgressCount++;
......
......@@ -525,7 +525,14 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
while (1)
{
/* allow flushing buffers from buffer pool during scan */
/*
* ChangeTracking_GetChanges() invokes a SQL query using SPI, which
* could result in dirty buffers being written out. Setting
* readBufferRequest indicates that these writes are performed on
* primary as well as mirror. When readBufferRequest flag is unset,
* resync workers send changed blocks only to mirror without writing
* them on primary.
*/
FileRepResync_SetReadBufferRequest();
if ((result = ChangeTracking_GetChanges(request)) != NULL)
{
......@@ -577,7 +584,14 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
goto flush_check;
}
/* allow flushing buffers from buffer pool during scan */
/*
* ReadBuffer() may need to write out a dirty buffer to make
* room in buffer cache. Setting readBufferRequest indicates
* that resync worker process should perform writes on primary.
* When readBufferRequest flag is unset, resync workers send
* changed blocks only to mirror without writing them on
* primary.
*/
FileRepResync_SetReadBufferRequest();
buf = ReadBuffer_Resync(smgr_relation,
result->entries[ii].block_num);
......@@ -684,7 +698,34 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
break;
}
}
else
{
Assert(result->count == gp_filerep_ct_batch_size);
Assert(request->count == 1);
/*
* Update last_fetched block in request so that the
* next call to GetChanges() knows where to start
* fetching changed blocks from.
*/
if (RelFileNodeEquals(request->entries[0].relFileNode,
result->entries[ii].relFileNode))
{
request->entries[0].last_fetched =
result->entries[ii].block_num;
elog(LOG, "%u/%u/%u last fetched block %d",
request->entries[0].relFileNode.spcNode,
request->entries[0].relFileNode.dbNode,
request->entries[0].relFileNode.relNode,
request->entries[0].last_fetched);
}
else
elog(ERROR, "changetracking request not found for "
"%u/%u/%u, block %d",
result->entries[ii].relFileNode.spcNode,
result->entries[ii].relFileNode.dbNode,
result->entries[ii].relFileNode.relNode,
result->entries[ii].block_num);
}
}
if (count > thresholdCount)
......@@ -706,17 +747,8 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
} // if ((result = ChangeTracking_GetChanges(request)) != NULL)
FileRepResync_ResetReadBufferRequest();
if (result != NULL && result->ask_for_more == true)
{
Assert(request->count == 1);
request->entries[0].lsn_start = result->next_start_lsn;
}
else
{
if (result == NULL || result->ask_for_more == false)
break;
}
} // while(1)
ChangeTracking_FreeRequest(request);
......
......@@ -1481,14 +1481,13 @@ ChangeTrackingRequest* ChangeTracking_FormRequest(int max_count)
}
/*
* Add an entry to the request. An entry is a specification of a relation
* and the start/end LSN of interest. If start LSN isn't needed, the caller
* must specify 0/0 for now.
* Add an entry to CT request. An entry is a relfilenode of a relation whose
* changed blocks recorded in CT log need to be resynchronized with mirror.
* The block number last_fetched indicates the last block number of this
* relation fetched and sent to mirror by a resync worker.
*/
void ChangeTracking_AddRequestEntry(ChangeTrackingRequest *request,
RelFileNode relFileNode,
XLogRecPtr* lsn_start,
XLogRecPtr* lsn_end)
void ChangeTracking_AddRequestEntry(ChangeTrackingRequest *request,
RelFileNode relFileNode)
{
ChangeTrackingRequestEntry* entry;
......@@ -1499,9 +1498,7 @@ void ChangeTracking_AddRequestEntry(ChangeTrackingRequest *request,
entry->relFileNode.relNode = relFileNode.relNode;
entry->relFileNode.spcNode = relFileNode.spcNode;
entry->relFileNode.dbNode = relFileNode.dbNode;
entry->lsn_start = *lsn_start;
entry->lsn_end = *lsn_end;
entry->last_fetched = 0;
request->count++;
}
......@@ -1527,8 +1524,6 @@ static ChangeTrackingResult* ChangeTracking_FormResult(int max_count)
result->count = 0;
result->max_count = max_count;
result->ask_for_more = false;
result->next_start_lsn.xlogid = 0;
result->next_start_lsn.xrecoff = 0;
result->entries = (ChangeTrackingResultEntry*) palloc(sizeof(ChangeTrackingResultEntry) * max_count);
return result;
......@@ -1565,15 +1560,14 @@ static void ChangeTracking_AddResultEntry(ChangeTrackingResult *result,
* found in the change tracking log file this routine will return
* the list of block numbers and the end LSN of each.
*
* We restrict the total number of changes that this routine returns
* to gp_filerep_ct_batch_size, in order to not overflow memory.
* If a specific relation is expected to have more than this number
* of changes, this routine will return the first gp_filerep_ct_batch_size
* change, along with setting the 'ask_for_more' flag in the result to
* indicate to the caller that a request with the same relation should
* be issued when ready. When this happens the caller should use the
* value returned in 'next_start_lsn' in each subsequent calls as the
* begin lsn value in the request entry for this relation.
* We restrict the total number of changes that this routine returns to
* gp_filerep_ct_batch_size, in order to not overflow memory. If a specific
* relation is expected to have more than this number of changes, this routine
* will return the first gp_filerep_ct_batch_size change, along with setting
* the 'ask_for_more' flag in the result to indicate to the caller that a
* request with the same relation should be issued when ready. When this
* happens the caller should set last_fetched in the relation's request to the
* highest block number seen so far.
*/
ChangeTrackingResult* ChangeTracking_GetChanges(ChangeTrackingRequest *request)
{
......@@ -1598,25 +1592,23 @@ ChangeTrackingResult* ChangeTracking_GetChanges(ChangeTrackingRequest *request)
for(i = 0 ; i < request->count ; i++)
{
XLogRecPtr lsn_start = request->entries[i].lsn_start;
XLogRecPtr lsn_end = request->entries[i].lsn_end;
Oid space = request->entries[i].relFileNode.spcNode;
Oid db = request->entries[i].relFileNode.dbNode;
Oid rel = request->entries[i].relFileNode.relNode;
BlockNumber last_fetched = request->entries[i].last_fetched;
if(i != 0)
appendStringInfo(&sqlstmt, "OR ");
appendStringInfo(&sqlstmt, "(space = %u AND "
"db = %u AND "
"rel = %u AND "
"xlogloc <= '(%X/%X)' AND "
"xlogloc >= '(%X/%X)') ",
space, db, rel, lsn_end.xlogid, lsn_end.xrecoff,
lsn_start.xlogid, lsn_start.xrecoff);
/* TODO: use gpxloglocout() to format the '(%X/%X)' instead of doing it manually here */
"rel = %u",
space, db, rel);
if (last_fetched > 0)
appendStringInfo(&sqlstmt, " AND blocknum > %u) ", last_fetched);
else
appendStringInfo(&sqlstmt, ") ");
}
appendStringInfo(&sqlstmt, "GROUP BY space, db, rel, blocknum "
"ORDER BY space, db, rel, blocknum ");
......@@ -1733,16 +1725,13 @@ ChangeTrackingResult* ChangeTracking_GetChanges(ChangeTrackingRequest *request)
/* TODO: in the above should use DatumGetXLogLoc instead, but it's not public */
/*
* skip the last "extra" entry if satisfied_request is false, and suggest
* the lsn to use in the next request for this same relation.
* skip the last "extra" entry if satisfied_request is false
*/
if(i == gp_filerep_ct_batch_size)
{
Assert(!satisfied_request);
Assert(result->ask_for_more);
result->next_start_lsn = *endlsn;
MemoryContextSwitchTo(cxt_save);
break;
}
......
......@@ -299,10 +299,16 @@ void ChangeTracking_FreeIncrementalChangeList(IncrementalChangeList* iclist);
typedef struct ChangeTrackingRequestEntry
{
XLogRecPtr lsn_start;
XLogRecPtr lsn_end;
RelFileNode relFileNode; /* The tablespace, database, and relation OIDs for the requested relation. */
/* Changed relation. */
RelFileNode relFileNode;
/*
* Block number of the last block fetched from CT log. This is used only
* when the number of changed blocks for a relation is greater than
* gp_filerep_ct_batch_size. The changed blocks are obtained in fixed
* sized batche. The last_fetched block number determines where to start
* the next batch from.
*/
BlockNumber last_fetched;
} ChangeTrackingRequestEntry;
typedef struct ChangeTrackingRequest
......@@ -313,10 +319,28 @@ typedef struct ChangeTrackingRequest
} ChangeTrackingRequest;
/*
* Changed block of a buffer pooled relation (heap). All the information in
* this object is obtained from CT log.
*/
typedef struct ChangeTrackingResultEntry
{
RelFileNode relFileNode; /* The tablespace, database, and relation OIDs for the requested relation. */
RelFileNode relFileNode;
BlockNumber block_num;
/*
* Most recent location in XLOG for a change made to this block while a
* primary segment was tracking changes. If a new change happens to the
* block after primary segment has transitioned from changetracking to
* resync, the block's LSN value will be newer than lsn_end. If so, the
* block will be mirrored already at the time the change was written on
* primary and doesn't need to be shipped to mirror during resync.
*
* In theory, a single global "last change-tracked LSN"
* (XLogLastChangeTrackedLoc() or fileRepResyncShmem->endIncrResyncLSN)
* should be suffice to decide whether a page should be sent to mirror
* during resync or not. Keeping track of lsn_end for each block of every
* relation seems redundant. We leave this as TODO.
*/
XLogRecPtr lsn_end;
} ChangeTrackingResultEntry;
......@@ -327,15 +351,11 @@ typedef struct ChangeTrackingResult
int count;
int max_count;
bool ask_for_more; /* there are more results for this rel. ask for them */
XLogRecPtr next_start_lsn; /* when asking again, use this lsn as start lsn */
} ChangeTrackingResult;
extern ChangeTrackingRequest* ChangeTracking_FormRequest(int count);
extern void ChangeTracking_AddRequestEntry(ChangeTrackingRequest* request,
RelFileNode relFileNode,
XLogRecPtr* lsn_start,
XLogRecPtr* lsn_end);
RelFileNode relFileNode);
extern ChangeTrackingResult* ChangeTracking_GetChanges(ChangeTrackingRequest* request);
extern void ChangeTracking_FreeRequest(ChangeTrackingRequest* request);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册