提交 6f940543 编写于 作者: X xiong-gang 提交者: GitHub

Detach resource group slot on segment

QE detach resource group slot at the end of transaction, the
last QE of the slot release the slot, and release the overused
memory if resource group config has been changed.
上级 fb1448d0
......@@ -2841,8 +2841,8 @@ CommitTransaction(void)
freeGangsForPortal(NULL);
/* Release resource group slot at the end of a transaction */
if (ShouldAssignResGroupOnMaster())
UnassignResGroupOnMaster();
if (ShouldUnassignResGroup())
UnassignResGroup();
}
......@@ -3394,8 +3394,8 @@ CleanupTransaction(void)
finishDistributedTransactionContext("CleanupTransaction", true);
/* Release resource group slot at the end of a transaction */
if (ShouldAssignResGroupOnMaster())
UnassignResGroupOnMaster();
if (ShouldUnassignResGroup())
UnassignResGroup();
}
/*
......
......@@ -348,6 +348,10 @@ DropResourceGroup(DropResourceGroupStmt *stmt)
errmsg("cannot drop default resource group \"%s\"",
stmt->name)));
/* check before dispatch to segment */
if (IsResGroupActivated())
ResGroupCheckForDrop(groupid, stmt->name);
/*
* Check to see if any roles are in this resource group.
*/
......@@ -399,8 +403,6 @@ DropResourceGroup(DropResourceGroupStmt *stmt)
if (IsResGroupActivated())
{
ResGroupCheckForDrop(groupid, stmt->name);
/* Argument of callback function should be allocated in heap region */
callbackArg = (Oid *)MemoryContextAlloc(TopMemoryContext, sizeof(Oid));
*callbackArg = groupid;
......
......@@ -205,15 +205,15 @@ static void ResGroupWait(ResGroupData *group);
static ResGroupData *ResGroupCreate(Oid groupId, const ResGroupCaps *caps);
static void AtProcExit_ResGroup(int code, Datum arg);
static void ResGroupWaitCancel(void);
static void groupAssginChunks(ResGroupData *group,
int32 chunks,
const ResGroupCaps *caps);
static int32 groupIncMemUsage(ResGroupData *group,
ResGroupSlotData *slot,
int32 chunks);
static void groupDecMemUsage(ResGroupData *group,
ResGroupSlotData *slot,
int32 chunks);
static void initSlot(ResGroupSlotData *slot, ResGroupCaps *caps, int sessionId);
static void selfAttachToSlot(ResGroupData *group, ResGroupSlotData *slot);
static void selfDetachSlot(ResGroupData *group, ResGroupSlotData *slot);
static int getFreeSlot(ResGroupData *group);
static int getSlot(ResGroupData *group);
static void putSlot(void);
......@@ -234,7 +234,7 @@ static bool selfHasSlot(void);
static bool selfHasGroup(void);
static void selfSetGroup(ResGroupData *group);
static void selfUnsetGroup(void);
static void selfSetSlot(void);
static void selfSetSlot(int slotId);
static void selfUnsetSlot(void);
static bool procIsInWaitQueue(const PGPROC *proc);
#ifdef USE_ASSERT_CHECKING
......@@ -1132,6 +1132,41 @@ groupDecMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks)
}
}
/*
* Attach a process (QD or QE) to a slot.
*/
static void
selfAttachToSlot(ResGroupData *group, ResGroupSlotData *slot)
{
AssertImply(slot->nProcs == 0, slot->memUsage == 0);
groupIncMemUsage(group, slot, self->memUsage);
slot->nProcs++;
}
/*
* Detach a process (QD or QE) from a slot.
*/
static void
selfDetachSlot(ResGroupData *group, ResGroupSlotData *slot)
{
groupDecMemUsage(group, slot, self->memUsage);
slot->nProcs--;
AssertImply(slot->nProcs == 0, slot->memUsage == 0);
}
/*
* Initialize the members of a slot
*/
static void
initSlot(ResGroupSlotData *slot, ResGroupCaps *caps, int sessionId)
{
Assert(slot->inUse);
slot->sessionId = sessionId;
slot->caps = *caps;
slot->memQuota = slotGetMemQuotaExpected(caps);
slot->memUsage = 0;
}
/*
* Get a free resource group slot.
*
......@@ -1171,7 +1206,6 @@ getFreeSlot(ResGroupData *group)
static int
getSlot(ResGroupData *group)
{
ResGroupSlotData *slot;
int32 slotMemQuota;
int32 memQuotaUsed;
int slotId;
......@@ -1195,7 +1229,6 @@ getSlot(ResGroupData *group)
/* Calculate the expected per slot quota */
slotMemQuota = slotGetMemQuotaExpected(caps);
Assert(slotMemQuota > 0);
Assert(group->memQuotaUsed >= 0);
Assert(group->memQuotaUsed <= group->memQuotaGranted);
......@@ -1215,17 +1248,7 @@ getSlot(ResGroupData *group)
slotId = getFreeSlot(group);
Assert(slotId != InvalidSlotId);
slot = &group->slots[slotId];
Assert(slot->inUse);
/* Grant the memory quota to it */
slot->memQuota = slotMemQuota;
/* Store the config snapshot to it */
slot->caps = *caps;
/* And finally increase nRunning */
pg_atomic_add_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1);
group->nRunning++;
return slotId;
}
......@@ -1255,7 +1278,6 @@ putSlot(void)
selfUnsetSlot();
Assert(slot->inUse);
Assert(slot->memQuota > 0);
/* Return the memory quota granted to this slot */
#ifdef USE_ASSERT_CHECKING
......@@ -1273,7 +1295,7 @@ putSlot(void)
slot->inUse = false;
/* And finally decrease nRunning */
pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&group->nRunning, 1);
group->nRunning--;
}
/*
......@@ -1324,17 +1346,17 @@ retry:
if (!group->lockedForDrop)
{
/* try to get a slot directly */
MyProc->resSlotId = getSlot(group);
int slotId = getSlot(group);
if (MyProc->resSlotId != InvalidSlotId)
if (slotId != InvalidSlotId)
{
/* got one, lucky */
selfSetSlot();
initSlot(&group->slots[slotId], &group->caps, gp_session_id);
selfSetSlot(slotId);
group->totalExecuted++;
pgstat_report_resgroup(0, group->groupId);
LWLockRelease(ResGroupLock);
Assert(selfIsAssignedValidGroup());
return;
}
}
......@@ -1365,14 +1387,13 @@ retry:
* The waking process has granted us a valid slot.
* Update the statistic information of the resource group.
*/
selfSetSlot();
selfSetSlot(MyProc->resSlotId);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
addTotalQueueDuration(group);
group->totalExecuted++;
LWLockRelease(ResGroupLock);
pgstat_report_resgroup(0, group->groupId);
Assert(selfIsAssignedValidGroup());
}
/* Update the total queued time of this group */
......@@ -1617,7 +1638,7 @@ static int32
slotGetMemQuotaExpected(const ResGroupCaps *caps)
{
Assert(caps->concurrency.proposed != 0);
return Max(1, groupGetMemQuotaExpected(caps) / caps->concurrency.proposed);
return groupGetMemQuotaExpected(caps) / caps->concurrency.proposed;
}
/*
......@@ -1653,6 +1674,7 @@ wakeupSlots(ResGroupData *group)
Assert(waitProc->resWaiting != false);
Assert(waitProc->resSlotId == InvalidSlotId);
initSlot(&group->slots[slotId], &group->caps, waitProc->mppSessionId);
waitProc->resWaiting = false;
waitProc->resSlotId = slotId;
SetLatch(&waitProc->procLatch);
......@@ -1797,8 +1819,7 @@ ResGroupSlotRelease(void)
ResGroupData *group = self->group;
Assert(selfIsAssignedValidGroup());
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
Assert(LWLockHeldExclusiveByMe(ResGroupLock));
putSlot();
Assert(!selfHasSlot());
......@@ -1822,12 +1843,11 @@ ResGroupSlotRelease(void)
Assert(waitProc->resWaiting != false);
Assert(waitProc->resSlotId == InvalidSlotId);
initSlot(&group->slots[slotId], &group->caps, waitProc->mppSessionId);
waitProc->resSlotId = slotId; /* pass the slot to new query */
waitProc->resWaiting = false;
SetLatch(&waitProc->procLatch);
}
LWLockRelease(ResGroupLock);
}
/*
......@@ -1871,7 +1891,10 @@ SerializeResGroupInfo(StringInfo str)
*/
void
DeserializeResGroupInfo(struct ResGroupCaps *capsOut,
const char *buf, int len)
Oid *groupId,
int *slotId,
const char *buf,
int len)
{
int i;
int tmp;
......@@ -1880,15 +1903,13 @@ DeserializeResGroupInfo(struct ResGroupCaps *capsOut,
Assert(len > 0);
/* TODO: don't deserialize into self directly */
memcpy(&tmp, ptr, sizeof(*groupId));
*groupId = ntohl(tmp);
ptr += sizeof(*groupId);
memcpy(&tmp, ptr, sizeof(self->groupId));
self->groupId = ntohl(tmp);
ptr += sizeof(self->groupId);
memcpy(&tmp, ptr, sizeof(self->slotId));
self->slotId = ntohl(tmp);
ptr += sizeof(self->slotId);
memcpy(&tmp, ptr, sizeof(*slotId));
*slotId = ntohl(tmp);
ptr += sizeof(*slotId);
for (i = 0; i < RESGROUP_LIMIT_TYPE_COUNT; i++)
{
......@@ -1916,6 +1937,18 @@ ShouldAssignResGroupOnMaster(void)
!AmIInSIGUSR1Handler();
}
/*
* UnassignResGroup() is called on both master and segments
*/
bool
ShouldUnassignResGroup(void)
{
return IsResGroupActivated() &&
IsNormalProcessingMode() &&
(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) &&
!AmIInSIGUSR1Handler();
}
/*
* On master, QD is assigned to a resource group at the beginning of a transaction.
* It will first acquire a slot from the resource group, and then, it will get the
......@@ -1933,26 +1966,21 @@ AssignResGroupOnMaster(void)
PG_TRY();
{
/* Acquire slot */
Assert(pResGroupControl != NULL);
Assert(pResGroupControl->segmentsOnMaster > 0);
Assert(selfIsUnassigned());
ResGroupSlotAcquire();
Assert(selfIsAssignedValidGroup());
Assert(selfHasSlot());
Assert(!self->doMemCheck);
group = self->group;
/* Init slot */
slot = self->slot;
Assert(slot->memQuota > 0);
slot->sessionId = gp_session_id;
pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1);
/* Add proc memory accounting info into group and slot */
selfAttachToSlot(group, slot);
/* Init self */
self->caps = slot->caps;
Assert(pResGroupControl != NULL);
Assert(pResGroupControl->segmentsOnMaster > 0);
/* Add proc memory accounting info into group and slot */
groupIncMemUsage(group, slot, self->memUsage);
/* Start memory limit checking */
self->doMemCheck = true;
......@@ -1961,14 +1989,14 @@ AssignResGroupOnMaster(void)
SIMPLE_FAULT_INJECTOR(ResGroupAssignedOnMaster);
/* Add into cgroup */
ResGroupOps_AssignGroup(group->groupId, MyProcPid);
ResGroupOps_AssignGroup(self->groupId, MyProcPid);
/* Set spill guc */
ResGroupSetMemorySpillRatio(&slot->caps);
}
PG_CATCH();
{
UnassignResGroupOnMaster();
UnassignResGroup();
PG_RE_THROW();
}
PG_END_TRY();
......@@ -1978,7 +2006,7 @@ AssignResGroupOnMaster(void)
* Detach from a resource group at the end of the transaction.
*/
void
UnassignResGroupOnMaster(void)
UnassignResGroup(void)
{
ResGroupData *group = self->group;
ResGroupSlotData *slot = self->slot;
......@@ -1994,22 +2022,42 @@ UnassignResGroupOnMaster(void)
/* Stop memory limit checking */
self->doMemCheck = false;
/* Sub proc memory accounting info from group and slot */
groupDecMemUsage(group, slot, self->memUsage);
/* Cleanup self */
if (self->memUsage > 10)
LOG_RESGROUP_DEBUG(LOG, "Idle proc memory usage: %d", self->memUsage);
/* Cleanup slotInfo */
pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
/* Release the slot */
ResGroupSlotRelease();
Assert(!selfHasSlot());
/* Sub proc memory accounting info from group and slot */
selfDetachSlot(group, slot);
if (Gp_role == GP_ROLE_DISPATCH)
{
/* Release the slot */
ResGroupSlotRelease();
Assert(!selfHasSlot());
}
else
{
Assert(Gp_role == GP_ROLE_EXECUTE);
selfUnsetSlot();
if (slot->nProcs == 0)
{
/* Release the slot memory */
groupReleaseMemQuota(group, slot);
/* Mark the slot as free */
slot->inUse = false;
/* And finally decrease nRunning */
group->nRunning--;
}
}
/* Cleanup group */
selfUnsetGroup();
LWLockRelease(ResGroupLock);
Assert(selfIsUnassigned());
}
......@@ -2022,134 +2070,68 @@ UnassignResGroupOnMaster(void)
void
SwitchResGroupOnSegment(const char *buf, int len)
{
Oid prevGroupId;
int prevSlotId;
Oid newGroupId;
int newSlotId;
ResGroupCaps caps;
ResGroupData *group;
ResGroupSlotData *slot;
ResGroupData *prevGroup = NULL;
ResGroupSlotData *prevSlot = NULL;
selfValidateResGroupInfo();
prevGroupId = self->groupId;
prevSlotId = self->slotId;
prevGroup = self->group;
prevSlot = self->slot;
/* Stop memory limit checking */
self->doMemCheck = false;
DeserializeResGroupInfo(&caps, buf, len);
DeserializeResGroupInfo(&caps, &newGroupId, &newSlotId, buf, len);
AssertImply(newGroupId != InvalidOid,
newSlotId != InvalidSlotId);
AssertImply(self->groupId != InvalidOid,
self->slotId != InvalidSlotId);
AssertImply(prevGroupId != InvalidOid,
prevSlotId != InvalidSlotId);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
if (self->groupId == InvalidOid)
if (newGroupId == InvalidOid)
{
/* about to switch to a none resgroup state ... */
self->group = NULL;
if (prevGroup)
{
/* from another resgroup, so detach from it */
Assert(prevGroup->groupId == prevGroupId);
/* Sub proc memory accounting info from group and slot */
groupDecMemUsage(prevGroup, prevSlot, self->memUsage);
/* Update info in previous slot */
pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&prevSlot->nProcs, 1);
groupReleaseMemQuota(prevGroup, prevSlot);
}
else
{
/* from a none resgroup state, so nothing to do */
Assert(prevGroupId == InvalidOid);
Assert(prevSlotId == InvalidSlotId);
}
LWLockRelease(ResGroupLock);
UnassignResGroup();
Assert(selfIsUnassigned());
return;
}
/* now we know we are about to switch to some resgroup ... */
if (prevGroup && prevGroup->groupId != prevGroupId)
if (self->groupId != InvalidOid)
{
/* from a dropped resgroup, so behave like we are from
* a none resgroup state */
prevGroup = NULL;
prevSlot = NULL;
prevGroupId = InvalidOid;
prevSlotId = InvalidSlotId;
/* it's not the first dispatch in the same transaction */
Assert(self->groupId == newGroupId);
Assert(self->slotId == newSlotId);
Assert(!memcmp((void*)&self->caps, (void*)&caps, sizeof(caps)));
self->doMemCheck = true;
return;
}
/* now we are sure to switch to a valid resgroup */
group = ResGroupHashFind(self->groupId);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = ResGroupHashFind(newGroupId);
Assert(group != NULL);
/* we don't set this group to self until end of this function */
LWLockRelease(ResGroupLock);
/* Init self */
Assert(host_segments > 0);
Assert(caps.concurrency.proposed > 0);
Assert(self->slotId != InvalidSlotId);
selfSetGroup(group);
selfSetSlot(newSlotId);
self->caps = caps;
self->slot = &group->slots[self->slotId];
Assert(selfHasSlot());
/* Init slot */
slot = self->slot;
slot->sessionId = gp_session_id;
slot->caps = caps;
slot->memQuota = slotGetMemQuotaExpected(&caps);
ResGroupSetMemorySpillRatio(&caps);
Assert(slot->memQuota > 0);
if (prevGroup != group || prevSlot != slot)
if (slot->nProcs != 0)
{
/* we are switching between different resgroups */
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
if (prevGroup)
{
/* the previous one is valid, so detach from it */
Assert(prevSlot != NULL);
/* Sub proc memory accounting info from group and slot */
groupDecMemUsage(prevGroup, prevSlot, self->memUsage);
/* Update info in previous slot */
pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&prevSlot->nProcs, 1);
groupReleaseMemQuota(prevGroup, prevSlot);
}
/* now attach to the new one */
groupAcquireMemQuota(group, &slot->caps);
LWLockRelease(ResGroupLock);
/* Add proc memory accounting info into group and slot */
groupIncMemUsage(group, slot, self->memUsage);
/* Update info in new slot */
pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->nProcs, 1);
Assert(slot->sessionId == gp_session_id);
Assert(slot->memQuota == slotGetMemQuotaExpected(&caps));
}
else
{
Assert(!slot->inUse);
slot->inUse = true;
initSlot(slot, &caps, gp_session_id);
group->nRunning++;
}
selfAttachToSlot(group, slot);
self->group = group;
ResGroupSetMemorySpillRatio(&caps);
groupAcquireMemQuota(group, &slot->caps);
LWLockRelease(ResGroupLock);
/* finally we can say we are in a valid resgroup */
Assert(selfIsAssignedValidGroup());
......@@ -2638,10 +2620,8 @@ selfUnsetGroup(void)
* - self must has not been set a slot before set;
*/
static void
selfSetSlot(void)
selfSetSlot(int slotId)
{
int slotId = MyProc->resSlotId;
Assert(selfHasGroup());
Assert(!selfHasSlot());
Assert(slotId != InvalidSlotId);
......
......@@ -109,11 +109,15 @@ extern void FreeResGroupEntry(Oid groupId);
extern void SerializeResGroupInfo(StringInfo str);
extern void DeserializeResGroupInfo(struct ResGroupCaps *capsOut,
const char *buf, int len);
Oid *groupId,
int *slotId,
const char *buf,
int len);
extern bool ShouldAssignResGroupOnMaster(void);
extern bool ShouldUnassignResGroup(void);
extern void AssignResGroupOnMaster(void);
extern void UnassignResGroupOnMaster(void);
extern void UnassignResGroup(void);
extern void SwitchResGroupOnSegment(const char *buf, int len);
/* Retrieve statistic information of type from resource group */
......
......@@ -124,7 +124,7 @@ SET
SELECT * FROM memory_result;
rsgname |ismaster|avg_mem
---------------+--------+-------
rg1_memory_test|0 |20
rg1_memory_test|0 |0
rg1_memory_test|1 |0
rg2_memory_test|0 |0
rg2_memory_test|1 |0
......@@ -162,7 +162,7 @@ SET
SELECT * FROM memory_result;
rsgname |ismaster|avg_mem
---------------+--------+-------
rg1_memory_test|0 |20
rg1_memory_test|0 |0
rg1_memory_test|1 |0
rg2_memory_test|0 |0
rg2_memory_test|1 |0
......@@ -179,7 +179,7 @@ rsgname |ismaster|avg_mem
---------------+--------+-------
rg1_memory_test|0 |0
rg1_memory_test|1 |0
rg2_memory_test|0 |40
rg2_memory_test|0 |0
rg2_memory_test|1 |0
(4 rows)
1q: ... <quitting>
......@@ -195,7 +195,7 @@ SET
SELECT * FROM memory_result;
rsgname |ismaster|avg_mem
---------------+--------+-------
rg1_memory_test|0 |40
rg1_memory_test|0 |0
rg1_memory_test|1 |0
rg2_memory_test|0 |0
rg2_memory_test|1 |0
......@@ -231,7 +231,7 @@ SET
SELECT * FROM memory_result;
rsgname |ismaster|avg_mem
---------------+--------+-------
rg1_memory_test|0 |80
rg1_memory_test|0 |0
rg1_memory_test|1 |0
rg2_memory_test|0 |0
rg2_memory_test|1 |0
......@@ -258,9 +258,9 @@ count
SELECT * FROM memory_result;
rsgname |ismaster|avg_mem
---------------+--------+-------
rg1_memory_test|0 |40
rg1_memory_test|0 |0
rg1_memory_test|1 |0
rg2_memory_test|0 |40
rg2_memory_test|0 |0
rg2_memory_test|1 |0
(4 rows)
1q: ... <quitting>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册