From 6f940543973d950b11d9ea2597a0059a53beef25 Mon Sep 17 00:00:00 2001 From: xiong-gang Date: Wed, 27 Sep 2017 10:45:53 +0800 Subject: [PATCH] Detach resource group slot on segment QE detach resource group slot at the end of transaction, the last QE of the slot release the slot, and release the overused memory if resource group config has been changed. --- src/backend/access/transam/xact.c | 8 +- src/backend/commands/resgroupcmds.c | 6 +- src/backend/utils/resgroup/resgroup.c | 304 ++++++++---------- src/include/utils/resgroup.h | 8 +- .../resgroup/resgroup_memory_statistic.source | 14 +- 5 files changed, 163 insertions(+), 177 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d322b8cc12..012d175e9f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2841,8 +2841,8 @@ CommitTransaction(void) freeGangsForPortal(NULL); /* Release resource group slot at the end of a transaction */ - if (ShouldAssignResGroupOnMaster()) - UnassignResGroupOnMaster(); + if (ShouldUnassignResGroup()) + UnassignResGroup(); } @@ -3394,8 +3394,8 @@ CleanupTransaction(void) finishDistributedTransactionContext("CleanupTransaction", true); /* Release resource group slot at the end of a transaction */ - if (ShouldAssignResGroupOnMaster()) - UnassignResGroupOnMaster(); + if (ShouldUnassignResGroup()) + UnassignResGroup(); } /* diff --git a/src/backend/commands/resgroupcmds.c b/src/backend/commands/resgroupcmds.c index ec7178c851..ebe216aee4 100644 --- a/src/backend/commands/resgroupcmds.c +++ b/src/backend/commands/resgroupcmds.c @@ -348,6 +348,10 @@ DropResourceGroup(DropResourceGroupStmt *stmt) errmsg("cannot drop default resource group \"%s\"", stmt->name))); + /* check before dispatch to segment */ + if (IsResGroupActivated()) + ResGroupCheckForDrop(groupid, stmt->name); + /* * Check to see if any roles are in this resource group. */ @@ -399,8 +403,6 @@ DropResourceGroup(DropResourceGroupStmt *stmt) if (IsResGroupActivated()) { - ResGroupCheckForDrop(groupid, stmt->name); - /* Argument of callback function should be allocated in heap region */ callbackArg = (Oid *)MemoryContextAlloc(TopMemoryContext, sizeof(Oid)); *callbackArg = groupid; diff --git a/src/backend/utils/resgroup/resgroup.c b/src/backend/utils/resgroup/resgroup.c index 886a34fd3d..d9317c71ed 100644 --- a/src/backend/utils/resgroup/resgroup.c +++ b/src/backend/utils/resgroup/resgroup.c @@ -205,15 +205,15 @@ static void ResGroupWait(ResGroupData *group); static ResGroupData *ResGroupCreate(Oid groupId, const ResGroupCaps *caps); static void AtProcExit_ResGroup(int code, Datum arg); static void ResGroupWaitCancel(void); -static void groupAssginChunks(ResGroupData *group, - int32 chunks, - const ResGroupCaps *caps); static int32 groupIncMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks); static void groupDecMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks); +static void initSlot(ResGroupSlotData *slot, ResGroupCaps *caps, int sessionId); +static void selfAttachToSlot(ResGroupData *group, ResGroupSlotData *slot); +static void selfDetachSlot(ResGroupData *group, ResGroupSlotData *slot); static int getFreeSlot(ResGroupData *group); static int getSlot(ResGroupData *group); static void putSlot(void); @@ -234,7 +234,7 @@ static bool selfHasSlot(void); static bool selfHasGroup(void); static void selfSetGroup(ResGroupData *group); static void selfUnsetGroup(void); -static void selfSetSlot(void); +static void selfSetSlot(int slotId); static void selfUnsetSlot(void); static bool procIsInWaitQueue(const PGPROC *proc); #ifdef USE_ASSERT_CHECKING @@ -1132,6 +1132,41 @@ groupDecMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks) } } +/* + * Attach a process (QD or QE) to a slot. + */ +static void +selfAttachToSlot(ResGroupData *group, ResGroupSlotData *slot) +{ + AssertImply(slot->nProcs == 0, slot->memUsage == 0); + groupIncMemUsage(group, slot, self->memUsage); + slot->nProcs++; +} + +/* + * Detach a process (QD or QE) from a slot. + */ +static void +selfDetachSlot(ResGroupData *group, ResGroupSlotData *slot) +{ + groupDecMemUsage(group, slot, self->memUsage); + slot->nProcs--; + AssertImply(slot->nProcs == 0, slot->memUsage == 0); +} + +/* + * Initialize the members of a slot + */ +static void +initSlot(ResGroupSlotData *slot, ResGroupCaps *caps, int sessionId) +{ + Assert(slot->inUse); + + slot->sessionId = sessionId; + slot->caps = *caps; + slot->memQuota = slotGetMemQuotaExpected(caps); + slot->memUsage = 0; +} /* * Get a free resource group slot. * @@ -1171,7 +1206,6 @@ getFreeSlot(ResGroupData *group) static int getSlot(ResGroupData *group) { - ResGroupSlotData *slot; int32 slotMemQuota; int32 memQuotaUsed; int slotId; @@ -1195,7 +1229,6 @@ getSlot(ResGroupData *group) /* Calculate the expected per slot quota */ slotMemQuota = slotGetMemQuotaExpected(caps); - Assert(slotMemQuota > 0); Assert(group->memQuotaUsed >= 0); Assert(group->memQuotaUsed <= group->memQuotaGranted); @@ -1215,17 +1248,7 @@ getSlot(ResGroupData *group) slotId = getFreeSlot(group); Assert(slotId != InvalidSlotId); - slot = &group->slots[slotId]; - Assert(slot->inUse); - - /* Grant the memory quota to it */ - slot->memQuota = slotMemQuota; - - /* Store the config snapshot to it */ - slot->caps = *caps; - - /* And finally increase nRunning */ - pg_atomic_add_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1); + group->nRunning++; return slotId; } @@ -1255,7 +1278,6 @@ putSlot(void) selfUnsetSlot(); Assert(slot->inUse); - Assert(slot->memQuota > 0); /* Return the memory quota granted to this slot */ #ifdef USE_ASSERT_CHECKING @@ -1273,7 +1295,7 @@ putSlot(void) slot->inUse = false; /* And finally decrease nRunning */ - pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1); + group->nRunning--; } /* @@ -1324,17 +1346,17 @@ retry: if (!group->lockedForDrop) { /* try to get a slot directly */ - MyProc->resSlotId = getSlot(group); + int slotId = getSlot(group); - if (MyProc->resSlotId != InvalidSlotId) + if (slotId != InvalidSlotId) { /* got one, lucky */ - selfSetSlot(); + initSlot(&group->slots[slotId], &group->caps, gp_session_id); + selfSetSlot(slotId); group->totalExecuted++; pgstat_report_resgroup(0, group->groupId); LWLockRelease(ResGroupLock); - Assert(selfIsAssignedValidGroup()); return; } } @@ -1365,14 +1387,13 @@ retry: * The waking process has granted us a valid slot. * Update the statistic information of the resource group. */ - selfSetSlot(); + selfSetSlot(MyProc->resSlotId); LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); addTotalQueueDuration(group); group->totalExecuted++; LWLockRelease(ResGroupLock); pgstat_report_resgroup(0, group->groupId); - Assert(selfIsAssignedValidGroup()); } /* Update the total queued time of this group */ @@ -1617,7 +1638,7 @@ static int32 slotGetMemQuotaExpected(const ResGroupCaps *caps) { Assert(caps->concurrency.proposed != 0); - return Max(1, groupGetMemQuotaExpected(caps) / caps->concurrency.proposed); + return groupGetMemQuotaExpected(caps) / caps->concurrency.proposed; } /* @@ -1653,6 +1674,7 @@ wakeupSlots(ResGroupData *group) Assert(waitProc->resWaiting != false); Assert(waitProc->resSlotId == InvalidSlotId); + initSlot(&group->slots[slotId], &group->caps, waitProc->mppSessionId); waitProc->resWaiting = false; waitProc->resSlotId = slotId; SetLatch(&waitProc->procLatch); @@ -1797,8 +1819,7 @@ ResGroupSlotRelease(void) ResGroupData *group = self->group; Assert(selfIsAssignedValidGroup()); - - LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); + Assert(LWLockHeldExclusiveByMe(ResGroupLock)); putSlot(); Assert(!selfHasSlot()); @@ -1822,12 +1843,11 @@ ResGroupSlotRelease(void) Assert(waitProc->resWaiting != false); Assert(waitProc->resSlotId == InvalidSlotId); + initSlot(&group->slots[slotId], &group->caps, waitProc->mppSessionId); waitProc->resSlotId = slotId; /* pass the slot to new query */ waitProc->resWaiting = false; SetLatch(&waitProc->procLatch); } - - LWLockRelease(ResGroupLock); } /* @@ -1871,7 +1891,10 @@ SerializeResGroupInfo(StringInfo str) */ void DeserializeResGroupInfo(struct ResGroupCaps *capsOut, - const char *buf, int len) + Oid *groupId, + int *slotId, + const char *buf, + int len) { int i; int tmp; @@ -1880,15 +1903,13 @@ DeserializeResGroupInfo(struct ResGroupCaps *capsOut, Assert(len > 0); - /* TODO: don't deserialize into self directly */ + memcpy(&tmp, ptr, sizeof(*groupId)); + *groupId = ntohl(tmp); + ptr += sizeof(*groupId); - memcpy(&tmp, ptr, sizeof(self->groupId)); - self->groupId = ntohl(tmp); - ptr += sizeof(self->groupId); - - memcpy(&tmp, ptr, sizeof(self->slotId)); - self->slotId = ntohl(tmp); - ptr += sizeof(self->slotId); + memcpy(&tmp, ptr, sizeof(*slotId)); + *slotId = ntohl(tmp); + ptr += sizeof(*slotId); for (i = 0; i < RESGROUP_LIMIT_TYPE_COUNT; i++) { @@ -1916,6 +1937,18 @@ ShouldAssignResGroupOnMaster(void) !AmIInSIGUSR1Handler(); } +/* + * UnassignResGroup() is called on both master and segments + */ +bool +ShouldUnassignResGroup(void) +{ + return IsResGroupActivated() && + IsNormalProcessingMode() && + (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) && + !AmIInSIGUSR1Handler(); +} + /* * On master, QD is assigned to a resource group at the beginning of a transaction. * It will first acquire a slot from the resource group, and then, it will get the @@ -1933,26 +1966,21 @@ AssignResGroupOnMaster(void) PG_TRY(); { /* Acquire slot */ + Assert(pResGroupControl != NULL); + Assert(pResGroupControl->segmentsOnMaster > 0); Assert(selfIsUnassigned()); ResGroupSlotAcquire(); Assert(selfIsAssignedValidGroup()); + Assert(selfHasSlot()); Assert(!self->doMemCheck); group = self->group; - - /* Init slot */ slot = self->slot; - Assert(slot->memQuota > 0); - slot->sessionId = gp_session_id; - pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1); + /* Add proc memory accounting info into group and slot */ + selfAttachToSlot(group, slot); /* Init self */ self->caps = slot->caps; - Assert(pResGroupControl != NULL); - Assert(pResGroupControl->segmentsOnMaster > 0); - - /* Add proc memory accounting info into group and slot */ - groupIncMemUsage(group, slot, self->memUsage); /* Start memory limit checking */ self->doMemCheck = true; @@ -1961,14 +1989,14 @@ AssignResGroupOnMaster(void) SIMPLE_FAULT_INJECTOR(ResGroupAssignedOnMaster); /* Add into cgroup */ - ResGroupOps_AssignGroup(group->groupId, MyProcPid); + ResGroupOps_AssignGroup(self->groupId, MyProcPid); /* Set spill guc */ ResGroupSetMemorySpillRatio(&slot->caps); } PG_CATCH(); { - UnassignResGroupOnMaster(); + UnassignResGroup(); PG_RE_THROW(); } PG_END_TRY(); @@ -1978,7 +2006,7 @@ AssignResGroupOnMaster(void) * Detach from a resource group at the end of the transaction. */ void -UnassignResGroupOnMaster(void) +UnassignResGroup(void) { ResGroupData *group = self->group; ResGroupSlotData *slot = self->slot; @@ -1994,22 +2022,42 @@ UnassignResGroupOnMaster(void) /* Stop memory limit checking */ self->doMemCheck = false; - /* Sub proc memory accounting info from group and slot */ - groupDecMemUsage(group, slot, self->memUsage); - /* Cleanup self */ if (self->memUsage > 10) LOG_RESGROUP_DEBUG(LOG, "Idle proc memory usage: %d", self->memUsage); - /* Cleanup slotInfo */ - pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1); + LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); - /* Release the slot */ - ResGroupSlotRelease(); - Assert(!selfHasSlot()); + /* Sub proc memory accounting info from group and slot */ + selfDetachSlot(group, slot); + + if (Gp_role == GP_ROLE_DISPATCH) + { + /* Release the slot */ + ResGroupSlotRelease(); + Assert(!selfHasSlot()); + } + else + { + Assert(Gp_role == GP_ROLE_EXECUTE); + selfUnsetSlot(); + + if (slot->nProcs == 0) + { + /* Release the slot memory */ + groupReleaseMemQuota(group, slot); + + /* Mark the slot as free */ + slot->inUse = false; + + /* And finally decrease nRunning */ + group->nRunning--; + } + } /* Cleanup group */ selfUnsetGroup(); + LWLockRelease(ResGroupLock); Assert(selfIsUnassigned()); } @@ -2022,134 +2070,68 @@ UnassignResGroupOnMaster(void) void SwitchResGroupOnSegment(const char *buf, int len) { - Oid prevGroupId; - int prevSlotId; + Oid newGroupId; + int newSlotId; ResGroupCaps caps; ResGroupData *group; ResGroupSlotData *slot; - ResGroupData *prevGroup = NULL; - ResGroupSlotData *prevSlot = NULL; - - selfValidateResGroupInfo(); - - prevGroupId = self->groupId; - prevSlotId = self->slotId; - prevGroup = self->group; - prevSlot = self->slot; /* Stop memory limit checking */ self->doMemCheck = false; - DeserializeResGroupInfo(&caps, buf, len); + DeserializeResGroupInfo(&caps, &newGroupId, &newSlotId, buf, len); + AssertImply(newGroupId != InvalidOid, + newSlotId != InvalidSlotId); - AssertImply(self->groupId != InvalidOid, - self->slotId != InvalidSlotId); - AssertImply(prevGroupId != InvalidOid, - prevSlotId != InvalidSlotId); - - LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); - - if (self->groupId == InvalidOid) + if (newGroupId == InvalidOid) { - /* about to switch to a none resgroup state ... */ - - self->group = NULL; - - if (prevGroup) - { - /* from another resgroup, so detach from it */ - - Assert(prevGroup->groupId == prevGroupId); - - /* Sub proc memory accounting info from group and slot */ - groupDecMemUsage(prevGroup, prevSlot, self->memUsage); - - /* Update info in previous slot */ - pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&prevSlot->nProcs, 1); - - groupReleaseMemQuota(prevGroup, prevSlot); - } - else - { - /* from a none resgroup state, so nothing to do */ - Assert(prevGroupId == InvalidOid); - Assert(prevSlotId == InvalidSlotId); - } - - LWLockRelease(ResGroupLock); + UnassignResGroup(); Assert(selfIsUnassigned()); return; } - /* now we know we are about to switch to some resgroup ... */ - - if (prevGroup && prevGroup->groupId != prevGroupId) + if (self->groupId != InvalidOid) { - /* from a dropped resgroup, so behave like we are from - * a none resgroup state */ - - prevGroup = NULL; - prevSlot = NULL; - prevGroupId = InvalidOid; - prevSlotId = InvalidSlotId; + /* it's not the first dispatch in the same transaction */ + Assert(self->groupId == newGroupId); + Assert(self->slotId == newSlotId); + Assert(!memcmp((void*)&self->caps, (void*)&caps, sizeof(caps))); + self->doMemCheck = true; + return; } - /* now we are sure to switch to a valid resgroup */ - - group = ResGroupHashFind(self->groupId); + LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); + group = ResGroupHashFind(newGroupId); Assert(group != NULL); - /* we don't set this group to self until end of this function */ - - LWLockRelease(ResGroupLock); /* Init self */ Assert(host_segments > 0); Assert(caps.concurrency.proposed > 0); - Assert(self->slotId != InvalidSlotId); + selfSetGroup(group); + selfSetSlot(newSlotId); self->caps = caps; - self->slot = &group->slots[self->slotId]; Assert(selfHasSlot()); /* Init slot */ slot = self->slot; - slot->sessionId = gp_session_id; - slot->caps = caps; - slot->memQuota = slotGetMemQuotaExpected(&caps); - ResGroupSetMemorySpillRatio(&caps); - Assert(slot->memQuota > 0); - - if (prevGroup != group || prevSlot != slot) + if (slot->nProcs != 0) { - /* we are switching between different resgroups */ - - LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); - if (prevGroup) - { - /* the previous one is valid, so detach from it */ - Assert(prevSlot != NULL); - - /* Sub proc memory accounting info from group and slot */ - groupDecMemUsage(prevGroup, prevSlot, self->memUsage); - - /* Update info in previous slot */ - pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&prevSlot->nProcs, 1); - - groupReleaseMemQuota(prevGroup, prevSlot); - } - - /* now attach to the new one */ - - groupAcquireMemQuota(group, &slot->caps); - LWLockRelease(ResGroupLock); - - /* Add proc memory accounting info into group and slot */ - groupIncMemUsage(group, slot, self->memUsage); - - /* Update info in new slot */ - pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1); + Assert(slot->sessionId == gp_session_id); + Assert(slot->memQuota == slotGetMemQuotaExpected(&caps)); } + else + { + Assert(!slot->inUse); + slot->inUse = true; + initSlot(slot, &caps, gp_session_id); + group->nRunning++; + } + selfAttachToSlot(group, slot); - self->group = group; + ResGroupSetMemorySpillRatio(&caps); + + groupAcquireMemQuota(group, &slot->caps); + LWLockRelease(ResGroupLock); /* finally we can say we are in a valid resgroup */ Assert(selfIsAssignedValidGroup()); @@ -2638,10 +2620,8 @@ selfUnsetGroup(void) * - self must has not been set a slot before set; */ static void -selfSetSlot(void) +selfSetSlot(int slotId) { - int slotId = MyProc->resSlotId; - Assert(selfHasGroup()); Assert(!selfHasSlot()); Assert(slotId != InvalidSlotId); diff --git a/src/include/utils/resgroup.h b/src/include/utils/resgroup.h index 4906913260..a050751b53 100644 --- a/src/include/utils/resgroup.h +++ b/src/include/utils/resgroup.h @@ -109,11 +109,15 @@ extern void FreeResGroupEntry(Oid groupId); extern void SerializeResGroupInfo(StringInfo str); extern void DeserializeResGroupInfo(struct ResGroupCaps *capsOut, - const char *buf, int len); + Oid *groupId, + int *slotId, + const char *buf, + int len); extern bool ShouldAssignResGroupOnMaster(void); +extern bool ShouldUnassignResGroup(void); extern void AssignResGroupOnMaster(void); -extern void UnassignResGroupOnMaster(void); +extern void UnassignResGroup(void); extern void SwitchResGroupOnSegment(const char *buf, int len); /* Retrieve statistic information of type from resource group */ diff --git a/src/test/isolation2/output/resgroup/resgroup_memory_statistic.source b/src/test/isolation2/output/resgroup/resgroup_memory_statistic.source index 5c0b754748..e81be41737 100644 --- a/src/test/isolation2/output/resgroup/resgroup_memory_statistic.source +++ b/src/test/isolation2/output/resgroup/resgroup_memory_statistic.source @@ -124,7 +124,7 @@ SET SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |20 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 rg2_memory_test|0 |0 rg2_memory_test|1 |0 @@ -162,7 +162,7 @@ SET SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |20 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 rg2_memory_test|0 |0 rg2_memory_test|1 |0 @@ -179,7 +179,7 @@ rsgname |ismaster|avg_mem ---------------+--------+------- rg1_memory_test|0 |0 rg1_memory_test|1 |0 -rg2_memory_test|0 |40 +rg2_memory_test|0 |0 rg2_memory_test|1 |0 (4 rows) 1q: ... @@ -195,7 +195,7 @@ SET SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |40 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 rg2_memory_test|0 |0 rg2_memory_test|1 |0 @@ -231,7 +231,7 @@ SET SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |80 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 rg2_memory_test|0 |0 rg2_memory_test|1 |0 @@ -258,9 +258,9 @@ count SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |40 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 -rg2_memory_test|0 |40 +rg2_memory_test|0 |0 rg2_memory_test|1 |0 (4 rows) 1q: ... -- GitLab