提交 b1be1294 编写于 作者: T Tom Lane

Fix O(N^2) behavior in pg_dump when many objects are in dependency loops.

Combining the loop workspace with the record of already-processed objects
might have been a cute trick, but it behaves horridly if there are many
dependency loops to repair: the time spent in the first step of findLoop()
grows as O(N^2).  Instead use a separate flag array indexed by dump ID,
which we can check in constant time.  The length of the workspace array
is now never more than the actual length of a dependency chain, which
should be reasonably short in all cases of practical interest.  The code
is noticeably easier to understand this way, too.

Per gripe from Mike Roest.  Since this is a longstanding performance bug,
backpatch to all supported versions.
上级 55eb2567
......@@ -100,11 +100,11 @@ static bool TopoSort(DumpableObject **objs,
static void addHeapElement(int val, int *heap, int heapLength);
static int removeHeapElement(int *heap, int heapLength);
static void findDependencyLoops(DumpableObject **objs, int nObjs, int totObjs);
static bool findLoop(DumpableObject *obj,
static int findLoop(DumpableObject *obj,
DumpId startPoint,
bool *processed,
DumpableObject **workspace,
int depth,
int *newDepth);
int depth);
static void repairDependencyLoop(DumpableObject **loop,
int nLoop);
static void describeDumpableObject(DumpableObject *obj,
......@@ -474,58 +474,52 @@ static void
findDependencyLoops(DumpableObject **objs, int nObjs, int totObjs)
{
/*
* We use a workspace array, the initial part of which stores objects
* already processed, and the rest of which is used as temporary space to
* try to build a loop in. This is convenient because we do not care
* about loops involving already-processed objects (see notes above); we
* can easily reject such loops in findLoop() because of this
* representation. After we identify and process a loop, we can add it to
* the initial part of the workspace just by moving the boundary pointer.
*
* When we determine that an object is not part of any interesting loop,
* we also add it to the initial part of the workspace. This is not
* necessary for correctness, but saves later invocations of findLoop()
* from uselessly chasing references to such an object.
*
* We make the workspace large enough to hold all objects in the original
* universe. This is probably overkill, but it's provably enough space...
* We use two data structures here. One is a bool array processed[],
* which is indexed by dump ID and marks the objects already processed
* during this invocation of findDependencyLoops(). The other is a
* workspace[] array of DumpableObject pointers, in which we try to build
* lists of objects constituting loops. We make workspace[] large enough
* to hold all the objects, which is huge overkill in most cases but could
* theoretically be necessary if there is a single dependency chain
* linking all the objects.
*/
bool *processed;
DumpableObject **workspace;
int initiallen;
bool fixedloop;
int i;
processed = (bool *) calloc(getMaxDumpId() + 1, sizeof(bool));
workspace = (DumpableObject **) malloc(totObjs * sizeof(DumpableObject *));
if (workspace == NULL)
if (processed == NULL || workspace == NULL)
exit_horribly(NULL, modulename, "out of memory\n");
initiallen = 0;
fixedloop = false;
for (i = 0; i < nObjs; i++)
{
DumpableObject *obj = objs[i];
int newlen;
int looplen;
int j;
workspace[initiallen] = NULL; /* see test below */
looplen = findLoop(obj, obj->dumpId, processed, workspace, 0);
if (findLoop(obj, obj->dumpId, workspace, initiallen, &newlen))
if (looplen > 0)
{
/* Found a loop of length newlen - initiallen */
repairDependencyLoop(&workspace[initiallen], newlen - initiallen);
/* Add loop members to workspace */
initiallen = newlen;
/* Found a loop, repair it */
repairDependencyLoop(workspace, looplen);
fixedloop = true;
/* Mark loop members as processed */
for (j = 0; j < looplen; j++)
processed[workspace[j]->dumpId] = true;
}
else
{
/*
* Didn't find a loop, but add this object to workspace anyway,
* unless it's already present. We piggyback on the test that
* findLoop() already did: it won't have tentatively added obj to
* workspace if it's already present.
* There's no loop starting at this object, but mark it processed
* anyway. This is not necessary for correctness, but saves later
* invocations of findLoop() from uselessly chasing references to
* such an object.
*/
if (workspace[initiallen] == obj)
initiallen++;
processed[obj->dumpId] = true;
}
}
......@@ -534,45 +528,51 @@ findDependencyLoops(DumpableObject **objs, int nObjs, int totObjs)
exit_horribly(NULL, modulename, "could not identify dependency loop\n");
free(workspace);
free(processed);
}
/*
* Recursively search for a circular dependency loop that doesn't include
* any existing workspace members.
* any already-processed objects.
*
* obj: object we are examining now
* startPoint: dumpId of starting object for the hoped-for circular loop
* workspace[]: work array for previously processed and current objects
* processed[]: flag array marking already-processed objects
* workspace[]: work array in which we are building list of loop members
* depth: number of valid entries in workspace[] at call
* newDepth: if successful, set to new number of workspace[] entries
*
* On success, *newDepth is set and workspace[] entries depth..*newDepth-1
* are filled with pointers to the members of the loop.
* On success, the length of the loop is returned, and workspace[] is filled
* with pointers to the members of the loop. On failure, we return 0.
*
* Note: it is possible that the given starting object is a member of more
* than one cycle; if so, we will find an arbitrary one of the cycles.
*/
static bool
static int
findLoop(DumpableObject *obj,
DumpId startPoint,
bool *processed,
DumpableObject **workspace,
int depth,
int *newDepth)
int depth)
{
int i;
/*
* Reject if obj is already present in workspace. This test serves three
* purposes: it prevents us from finding loops that overlap
* previously-processed loops, it prevents us from going into infinite
* recursion if we are given a startPoint object that links to a cycle
* it's not a member of, and it guarantees that we can't overflow the
* allocated size of workspace[].
* Reject if obj is already processed. This test prevents us from finding
* loops that overlap previously-processed loops.
*/
if (processed[obj->dumpId])
return 0;
/*
* Reject if obj is already present in workspace. This test prevents us
* from going into infinite recursion if we are given a startPoint object
* that links to a cycle it's not a member of, and it guarantees that we
* can't overflow the allocated size of workspace[].
*/
for (i = 0; i < depth; i++)
{
if (workspace[i] == obj)
return false;
return 0;
}
/*
......@@ -586,10 +586,7 @@ findLoop(DumpableObject *obj,
for (i = 0; i < obj->nDeps; i++)
{
if (obj->dependencies[i] == startPoint)
{
*newDepth = depth;
return true;
}
return depth;
}
/*
......@@ -598,18 +595,20 @@ findLoop(DumpableObject *obj,
for (i = 0; i < obj->nDeps; i++)
{
DumpableObject *nextobj = findObjectByDumpId(obj->dependencies[i]);
int newDepth;
if (!nextobj)
continue; /* ignore dependencies on undumped objects */
if (findLoop(nextobj,
startPoint,
workspace,
depth,
newDepth))
return true;
newDepth = findLoop(nextobj,
startPoint,
processed,
workspace,
depth);
if (newDepth > 0)
return newDepth;
}
return false;
return 0;
}
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册