diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d322b8cc124b116a5b35d9d774476e89b7348872..012d175e9fb44f59053085540412ff64d08e6681 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2841,8 +2841,8 @@ CommitTransaction(void) freeGangsForPortal(NULL); /* Release resource group slot at the end of a transaction */ - if (ShouldAssignResGroupOnMaster()) - UnassignResGroupOnMaster(); + if (ShouldUnassignResGroup()) + UnassignResGroup(); } @@ -3394,8 +3394,8 @@ CleanupTransaction(void) finishDistributedTransactionContext("CleanupTransaction", true); /* Release resource group slot at the end of a transaction */ - if (ShouldAssignResGroupOnMaster()) - UnassignResGroupOnMaster(); + if (ShouldUnassignResGroup()) + UnassignResGroup(); } /* diff --git a/src/backend/commands/resgroupcmds.c b/src/backend/commands/resgroupcmds.c index ec7178c8510da755611c928514012b63687f4628..ebe216aee4f522ea30be2dafa82ea318b318649b 100644 --- a/src/backend/commands/resgroupcmds.c +++ b/src/backend/commands/resgroupcmds.c @@ -348,6 +348,10 @@ DropResourceGroup(DropResourceGroupStmt *stmt) errmsg("cannot drop default resource group \"%s\"", stmt->name))); + /* check before dispatch to segment */ + if (IsResGroupActivated()) + ResGroupCheckForDrop(groupid, stmt->name); + /* * Check to see if any roles are in this resource group. */ @@ -399,8 +403,6 @@ DropResourceGroup(DropResourceGroupStmt *stmt) if (IsResGroupActivated()) { - ResGroupCheckForDrop(groupid, stmt->name); - /* Argument of callback function should be allocated in heap region */ callbackArg = (Oid *)MemoryContextAlloc(TopMemoryContext, sizeof(Oid)); *callbackArg = groupid; diff --git a/src/backend/utils/resgroup/resgroup.c b/src/backend/utils/resgroup/resgroup.c index 886a34fd3d0d0a0164123212c90622dc7d32c695..d9317c71ed1ef5e8163e1b31ef221ea556152b4c 100644 --- a/src/backend/utils/resgroup/resgroup.c +++ b/src/backend/utils/resgroup/resgroup.c @@ -205,15 +205,15 @@ static void ResGroupWait(ResGroupData *group); static ResGroupData *ResGroupCreate(Oid groupId, const ResGroupCaps *caps); static void AtProcExit_ResGroup(int code, Datum arg); static void ResGroupWaitCancel(void); -static void groupAssginChunks(ResGroupData *group, - int32 chunks, - const ResGroupCaps *caps); static int32 groupIncMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks); static void groupDecMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks); +static void initSlot(ResGroupSlotData *slot, ResGroupCaps *caps, int sessionId); +static void selfAttachToSlot(ResGroupData *group, ResGroupSlotData *slot); +static void selfDetachSlot(ResGroupData *group, ResGroupSlotData *slot); static int getFreeSlot(ResGroupData *group); static int getSlot(ResGroupData *group); static void putSlot(void); @@ -234,7 +234,7 @@ static bool selfHasSlot(void); static bool selfHasGroup(void); static void selfSetGroup(ResGroupData *group); static void selfUnsetGroup(void); -static void selfSetSlot(void); +static void selfSetSlot(int slotId); static void selfUnsetSlot(void); static bool procIsInWaitQueue(const PGPROC *proc); #ifdef USE_ASSERT_CHECKING @@ -1132,6 +1132,41 @@ groupDecMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks) } } +/* + * Attach a process (QD or QE) to a slot. + */ +static void +selfAttachToSlot(ResGroupData *group, ResGroupSlotData *slot) +{ + AssertImply(slot->nProcs == 0, slot->memUsage == 0); + groupIncMemUsage(group, slot, self->memUsage); + slot->nProcs++; +} + +/* + * Detach a process (QD or QE) from a slot. + */ +static void +selfDetachSlot(ResGroupData *group, ResGroupSlotData *slot) +{ + groupDecMemUsage(group, slot, self->memUsage); + slot->nProcs--; + AssertImply(slot->nProcs == 0, slot->memUsage == 0); +} + +/* + * Initialize the members of a slot + */ +static void +initSlot(ResGroupSlotData *slot, ResGroupCaps *caps, int sessionId) +{ + Assert(slot->inUse); + + slot->sessionId = sessionId; + slot->caps = *caps; + slot->memQuota = slotGetMemQuotaExpected(caps); + slot->memUsage = 0; +} /* * Get a free resource group slot. * @@ -1171,7 +1206,6 @@ getFreeSlot(ResGroupData *group) static int getSlot(ResGroupData *group) { - ResGroupSlotData *slot; int32 slotMemQuota; int32 memQuotaUsed; int slotId; @@ -1195,7 +1229,6 @@ getSlot(ResGroupData *group) /* Calculate the expected per slot quota */ slotMemQuota = slotGetMemQuotaExpected(caps); - Assert(slotMemQuota > 0); Assert(group->memQuotaUsed >= 0); Assert(group->memQuotaUsed <= group->memQuotaGranted); @@ -1215,17 +1248,7 @@ getSlot(ResGroupData *group) slotId = getFreeSlot(group); Assert(slotId != InvalidSlotId); - slot = &group->slots[slotId]; - Assert(slot->inUse); - - /* Grant the memory quota to it */ - slot->memQuota = slotMemQuota; - - /* Store the config snapshot to it */ - slot->caps = *caps; - - /* And finally increase nRunning */ - pg_atomic_add_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1); + group->nRunning++; return slotId; } @@ -1255,7 +1278,6 @@ putSlot(void) selfUnsetSlot(); Assert(slot->inUse); - Assert(slot->memQuota > 0); /* Return the memory quota granted to this slot */ #ifdef USE_ASSERT_CHECKING @@ -1273,7 +1295,7 @@ putSlot(void) slot->inUse = false; /* And finally decrease nRunning */ - pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1); + group->nRunning--; } /* @@ -1324,17 +1346,17 @@ retry: if (!group->lockedForDrop) { /* try to get a slot directly */ - MyProc->resSlotId = getSlot(group); + int slotId = getSlot(group); - if (MyProc->resSlotId != InvalidSlotId) + if (slotId != InvalidSlotId) { /* got one, lucky */ - selfSetSlot(); + initSlot(&group->slots[slotId], &group->caps, gp_session_id); + selfSetSlot(slotId); group->totalExecuted++; pgstat_report_resgroup(0, group->groupId); LWLockRelease(ResGroupLock); - Assert(selfIsAssignedValidGroup()); return; } } @@ -1365,14 +1387,13 @@ retry: * The waking process has granted us a valid slot. * Update the statistic information of the resource group. */ - selfSetSlot(); + selfSetSlot(MyProc->resSlotId); LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); addTotalQueueDuration(group); group->totalExecuted++; LWLockRelease(ResGroupLock); pgstat_report_resgroup(0, group->groupId); - Assert(selfIsAssignedValidGroup()); } /* Update the total queued time of this group */ @@ -1617,7 +1638,7 @@ static int32 slotGetMemQuotaExpected(const ResGroupCaps *caps) { Assert(caps->concurrency.proposed != 0); - return Max(1, groupGetMemQuotaExpected(caps) / caps->concurrency.proposed); + return groupGetMemQuotaExpected(caps) / caps->concurrency.proposed; } /* @@ -1653,6 +1674,7 @@ wakeupSlots(ResGroupData *group) Assert(waitProc->resWaiting != false); Assert(waitProc->resSlotId == InvalidSlotId); + initSlot(&group->slots[slotId], &group->caps, waitProc->mppSessionId); waitProc->resWaiting = false; waitProc->resSlotId = slotId; SetLatch(&waitProc->procLatch); @@ -1797,8 +1819,7 @@ ResGroupSlotRelease(void) ResGroupData *group = self->group; Assert(selfIsAssignedValidGroup()); - - LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); + Assert(LWLockHeldExclusiveByMe(ResGroupLock)); putSlot(); Assert(!selfHasSlot()); @@ -1822,12 +1843,11 @@ ResGroupSlotRelease(void) Assert(waitProc->resWaiting != false); Assert(waitProc->resSlotId == InvalidSlotId); + initSlot(&group->slots[slotId], &group->caps, waitProc->mppSessionId); waitProc->resSlotId = slotId; /* pass the slot to new query */ waitProc->resWaiting = false; SetLatch(&waitProc->procLatch); } - - LWLockRelease(ResGroupLock); } /* @@ -1871,7 +1891,10 @@ SerializeResGroupInfo(StringInfo str) */ void DeserializeResGroupInfo(struct ResGroupCaps *capsOut, - const char *buf, int len) + Oid *groupId, + int *slotId, + const char *buf, + int len) { int i; int tmp; @@ -1880,15 +1903,13 @@ DeserializeResGroupInfo(struct ResGroupCaps *capsOut, Assert(len > 0); - /* TODO: don't deserialize into self directly */ + memcpy(&tmp, ptr, sizeof(*groupId)); + *groupId = ntohl(tmp); + ptr += sizeof(*groupId); - memcpy(&tmp, ptr, sizeof(self->groupId)); - self->groupId = ntohl(tmp); - ptr += sizeof(self->groupId); - - memcpy(&tmp, ptr, sizeof(self->slotId)); - self->slotId = ntohl(tmp); - ptr += sizeof(self->slotId); + memcpy(&tmp, ptr, sizeof(*slotId)); + *slotId = ntohl(tmp); + ptr += sizeof(*slotId); for (i = 0; i < RESGROUP_LIMIT_TYPE_COUNT; i++) { @@ -1916,6 +1937,18 @@ ShouldAssignResGroupOnMaster(void) !AmIInSIGUSR1Handler(); } +/* + * UnassignResGroup() is called on both master and segments + */ +bool +ShouldUnassignResGroup(void) +{ + return IsResGroupActivated() && + IsNormalProcessingMode() && + (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) && + !AmIInSIGUSR1Handler(); +} + /* * On master, QD is assigned to a resource group at the beginning of a transaction. * It will first acquire a slot from the resource group, and then, it will get the @@ -1933,26 +1966,21 @@ AssignResGroupOnMaster(void) PG_TRY(); { /* Acquire slot */ + Assert(pResGroupControl != NULL); + Assert(pResGroupControl->segmentsOnMaster > 0); Assert(selfIsUnassigned()); ResGroupSlotAcquire(); Assert(selfIsAssignedValidGroup()); + Assert(selfHasSlot()); Assert(!self->doMemCheck); group = self->group; - - /* Init slot */ slot = self->slot; - Assert(slot->memQuota > 0); - slot->sessionId = gp_session_id; - pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1); + /* Add proc memory accounting info into group and slot */ + selfAttachToSlot(group, slot); /* Init self */ self->caps = slot->caps; - Assert(pResGroupControl != NULL); - Assert(pResGroupControl->segmentsOnMaster > 0); - - /* Add proc memory accounting info into group and slot */ - groupIncMemUsage(group, slot, self->memUsage); /* Start memory limit checking */ self->doMemCheck = true; @@ -1961,14 +1989,14 @@ AssignResGroupOnMaster(void) SIMPLE_FAULT_INJECTOR(ResGroupAssignedOnMaster); /* Add into cgroup */ - ResGroupOps_AssignGroup(group->groupId, MyProcPid); + ResGroupOps_AssignGroup(self->groupId, MyProcPid); /* Set spill guc */ ResGroupSetMemorySpillRatio(&slot->caps); } PG_CATCH(); { - UnassignResGroupOnMaster(); + UnassignResGroup(); PG_RE_THROW(); } PG_END_TRY(); @@ -1978,7 +2006,7 @@ AssignResGroupOnMaster(void) * Detach from a resource group at the end of the transaction. */ void -UnassignResGroupOnMaster(void) +UnassignResGroup(void) { ResGroupData *group = self->group; ResGroupSlotData *slot = self->slot; @@ -1994,22 +2022,42 @@ UnassignResGroupOnMaster(void) /* Stop memory limit checking */ self->doMemCheck = false; - /* Sub proc memory accounting info from group and slot */ - groupDecMemUsage(group, slot, self->memUsage); - /* Cleanup self */ if (self->memUsage > 10) LOG_RESGROUP_DEBUG(LOG, "Idle proc memory usage: %d", self->memUsage); - /* Cleanup slotInfo */ - pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1); + LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); - /* Release the slot */ - ResGroupSlotRelease(); - Assert(!selfHasSlot()); + /* Sub proc memory accounting info from group and slot */ + selfDetachSlot(group, slot); + + if (Gp_role == GP_ROLE_DISPATCH) + { + /* Release the slot */ + ResGroupSlotRelease(); + Assert(!selfHasSlot()); + } + else + { + Assert(Gp_role == GP_ROLE_EXECUTE); + selfUnsetSlot(); + + if (slot->nProcs == 0) + { + /* Release the slot memory */ + groupReleaseMemQuota(group, slot); + + /* Mark the slot as free */ + slot->inUse = false; + + /* And finally decrease nRunning */ + group->nRunning--; + } + } /* Cleanup group */ selfUnsetGroup(); + LWLockRelease(ResGroupLock); Assert(selfIsUnassigned()); } @@ -2022,134 +2070,68 @@ UnassignResGroupOnMaster(void) void SwitchResGroupOnSegment(const char *buf, int len) { - Oid prevGroupId; - int prevSlotId; + Oid newGroupId; + int newSlotId; ResGroupCaps caps; ResGroupData *group; ResGroupSlotData *slot; - ResGroupData *prevGroup = NULL; - ResGroupSlotData *prevSlot = NULL; - - selfValidateResGroupInfo(); - - prevGroupId = self->groupId; - prevSlotId = self->slotId; - prevGroup = self->group; - prevSlot = self->slot; /* Stop memory limit checking */ self->doMemCheck = false; - DeserializeResGroupInfo(&caps, buf, len); + DeserializeResGroupInfo(&caps, &newGroupId, &newSlotId, buf, len); + AssertImply(newGroupId != InvalidOid, + newSlotId != InvalidSlotId); - AssertImply(self->groupId != InvalidOid, - self->slotId != InvalidSlotId); - AssertImply(prevGroupId != InvalidOid, - prevSlotId != InvalidSlotId); - - LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); - - if (self->groupId == InvalidOid) + if (newGroupId == InvalidOid) { - /* about to switch to a none resgroup state ... */ - - self->group = NULL; - - if (prevGroup) - { - /* from another resgroup, so detach from it */ - - Assert(prevGroup->groupId == prevGroupId); - - /* Sub proc memory accounting info from group and slot */ - groupDecMemUsage(prevGroup, prevSlot, self->memUsage); - - /* Update info in previous slot */ - pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&prevSlot->nProcs, 1); - - groupReleaseMemQuota(prevGroup, prevSlot); - } - else - { - /* from a none resgroup state, so nothing to do */ - Assert(prevGroupId == InvalidOid); - Assert(prevSlotId == InvalidSlotId); - } - - LWLockRelease(ResGroupLock); + UnassignResGroup(); Assert(selfIsUnassigned()); return; } - /* now we know we are about to switch to some resgroup ... */ - - if (prevGroup && prevGroup->groupId != prevGroupId) + if (self->groupId != InvalidOid) { - /* from a dropped resgroup, so behave like we are from - * a none resgroup state */ - - prevGroup = NULL; - prevSlot = NULL; - prevGroupId = InvalidOid; - prevSlotId = InvalidSlotId; + /* it's not the first dispatch in the same transaction */ + Assert(self->groupId == newGroupId); + Assert(self->slotId == newSlotId); + Assert(!memcmp((void*)&self->caps, (void*)&caps, sizeof(caps))); + self->doMemCheck = true; + return; } - /* now we are sure to switch to a valid resgroup */ - - group = ResGroupHashFind(self->groupId); + LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); + group = ResGroupHashFind(newGroupId); Assert(group != NULL); - /* we don't set this group to self until end of this function */ - - LWLockRelease(ResGroupLock); /* Init self */ Assert(host_segments > 0); Assert(caps.concurrency.proposed > 0); - Assert(self->slotId != InvalidSlotId); + selfSetGroup(group); + selfSetSlot(newSlotId); self->caps = caps; - self->slot = &group->slots[self->slotId]; Assert(selfHasSlot()); /* Init slot */ slot = self->slot; - slot->sessionId = gp_session_id; - slot->caps = caps; - slot->memQuota = slotGetMemQuotaExpected(&caps); - ResGroupSetMemorySpillRatio(&caps); - Assert(slot->memQuota > 0); - - if (prevGroup != group || prevSlot != slot) + if (slot->nProcs != 0) { - /* we are switching between different resgroups */ - - LWLockAcquire(ResGroupLock, LW_EXCLUSIVE); - if (prevGroup) - { - /* the previous one is valid, so detach from it */ - Assert(prevSlot != NULL); - - /* Sub proc memory accounting info from group and slot */ - groupDecMemUsage(prevGroup, prevSlot, self->memUsage); - - /* Update info in previous slot */ - pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&prevSlot->nProcs, 1); - - groupReleaseMemQuota(prevGroup, prevSlot); - } - - /* now attach to the new one */ - - groupAcquireMemQuota(group, &slot->caps); - LWLockRelease(ResGroupLock); - - /* Add proc memory accounting info into group and slot */ - groupIncMemUsage(group, slot, self->memUsage); - - /* Update info in new slot */ - pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1); + Assert(slot->sessionId == gp_session_id); + Assert(slot->memQuota == slotGetMemQuotaExpected(&caps)); } + else + { + Assert(!slot->inUse); + slot->inUse = true; + initSlot(slot, &caps, gp_session_id); + group->nRunning++; + } + selfAttachToSlot(group, slot); - self->group = group; + ResGroupSetMemorySpillRatio(&caps); + + groupAcquireMemQuota(group, &slot->caps); + LWLockRelease(ResGroupLock); /* finally we can say we are in a valid resgroup */ Assert(selfIsAssignedValidGroup()); @@ -2638,10 +2620,8 @@ selfUnsetGroup(void) * - self must has not been set a slot before set; */ static void -selfSetSlot(void) +selfSetSlot(int slotId) { - int slotId = MyProc->resSlotId; - Assert(selfHasGroup()); Assert(!selfHasSlot()); Assert(slotId != InvalidSlotId); diff --git a/src/include/utils/resgroup.h b/src/include/utils/resgroup.h index 490691326037d11ba86386ea1ac8902e0955a90d..a050751b537f0a038307c63eba26456fcc28bed9 100644 --- a/src/include/utils/resgroup.h +++ b/src/include/utils/resgroup.h @@ -109,11 +109,15 @@ extern void FreeResGroupEntry(Oid groupId); extern void SerializeResGroupInfo(StringInfo str); extern void DeserializeResGroupInfo(struct ResGroupCaps *capsOut, - const char *buf, int len); + Oid *groupId, + int *slotId, + const char *buf, + int len); extern bool ShouldAssignResGroupOnMaster(void); +extern bool ShouldUnassignResGroup(void); extern void AssignResGroupOnMaster(void); -extern void UnassignResGroupOnMaster(void); +extern void UnassignResGroup(void); extern void SwitchResGroupOnSegment(const char *buf, int len); /* Retrieve statistic information of type from resource group */ diff --git a/src/test/isolation2/output/resgroup/resgroup_memory_statistic.source b/src/test/isolation2/output/resgroup/resgroup_memory_statistic.source index 5c0b754748c225bc7d3b73551fba50aa6464390d..e81be41737bdf773d3d4c9ddca16261e940fe668 100644 --- a/src/test/isolation2/output/resgroup/resgroup_memory_statistic.source +++ b/src/test/isolation2/output/resgroup/resgroup_memory_statistic.source @@ -124,7 +124,7 @@ SET SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |20 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 rg2_memory_test|0 |0 rg2_memory_test|1 |0 @@ -162,7 +162,7 @@ SET SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |20 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 rg2_memory_test|0 |0 rg2_memory_test|1 |0 @@ -179,7 +179,7 @@ rsgname |ismaster|avg_mem ---------------+--------+------- rg1_memory_test|0 |0 rg1_memory_test|1 |0 -rg2_memory_test|0 |40 +rg2_memory_test|0 |0 rg2_memory_test|1 |0 (4 rows) 1q: ... @@ -195,7 +195,7 @@ SET SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |40 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 rg2_memory_test|0 |0 rg2_memory_test|1 |0 @@ -231,7 +231,7 @@ SET SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |80 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 rg2_memory_test|0 |0 rg2_memory_test|1 |0 @@ -258,9 +258,9 @@ count SELECT * FROM memory_result; rsgname |ismaster|avg_mem ---------------+--------+------- -rg1_memory_test|0 |40 +rg1_memory_test|0 |0 rg1_memory_test|1 |0 -rg2_memory_test|0 |40 +rg2_memory_test|0 |0 rg2_memory_test|1 |0 (4 rows) 1q: ...