提交 2650f728 编写于 作者: P Pengzhou Tang

Implement resource group cpu rate limitation.

Resource group cpu rate limitation is implemented with cgroup on linux
system. When resource group is enabled via GUC we check whether cgroup
is available and properly configured on the system. A sub cgroup is
created for each resource group, cpu quota and share weight will be set
depends on the resource group configuration. The queries will run under
these cgroups, and the cpu usage will be restricted by cgroup.

The cgroups directory structures:
* /sys/fs/cgroup/{cpu,cpuacct}/gpdb: the toplevel gpdb cgroup
* /sys/fs/cgroup/{cpu,cpuacct}/gpdb/*/: cgroup for each resource group

The logic for cpu rate limitation:

* in toplevel gpdb cgroup we set the cpu quota and share weight as:

    cpu.cfs_quota_us := cpu.cfs_period_us * 256 * gp_resource_group_cpu_limit
    cpu.shares := 1024 * ncores

* for each sub group we set the cpu quota and share weight as:

    sub.cpu.cfs_quota_us := -1
    sub.cpu.shares := top.cpu.shares * sub.cpu_rate_limit

The minimum and maximum cpu percentage for a sub cgroup:

    sub.cpu.min_percentage := gp_resource_group_cpu_limit * sub.cpu_rate_limit
    sub.cpu.max_percentage := gp_resource_group_cpu_limit

The acutal percentage depends on how busy the system is.

gp_resource_group_cpu_limit is a GUC introduced to control the cpu
resgroups assigned on each host.

    gpconfig -c gp_resource_group_cpu_limit -v '0.9'

A new pipeline is created to perform the tests as we need privileged
permission to enable and setup cgroups on the system.
Signed-off-by: NNing Yu <nyu@pivotal.io>
上级 7e774f28
......@@ -148,6 +148,8 @@ installcheck-world:
$(MAKE) -C gpMgmt/bin installcheck
gpcheckcat -A
installcheck-resgroup:
$(MAKE) -C src/test/isolation2 $@
# Run mock tests, that don't require a running server. Arguably these should
# be part of [install]check-world, but we treat them more like part of
......
......@@ -11,7 +11,7 @@
# GNUmakefile won't exist yet, so we catch that case as well.
all check install installdirs installcheck installcheck-parallel installcheck-good installcheck-optfunctional uninstall clean distclean maintainer-clean dist distcheck world check-world install-world installcheck-world:
all check install installdirs installcheck installcheck-parallel installcheck-good installcheck-optfunctional uninstall clean distclean maintainer-clean dist distcheck world check-world install-world installcheck-world installcheck-resgroup:
@if [ ! -f GNUmakefile ] ; then \
echo "You need to run the 'configure' program first. See the file"; \
echo "'INSTALL' for installation instructions." ; \
......
......@@ -528,6 +528,26 @@ jobs:
params:
TEST_OS: centos
- name: regression_tests_resource_group
plan:
- aggregate:
- get: gpdb_src
passed: [compile_gpdb_centos6]
- get: bin_gpdb
resource: bin_gpdb_centos6
passed: [compile_gpdb_centos6]
trigger: true
- get: centos-gpdb-dev-6
- task: ic_gpdb_resgroup
file: gpdb_src/concourse/tasks/ic_gpdb_resgroup.yml
privileged: true
image: centos-gpdb-dev-6
params:
MAKE_TEST_COMMAND: PGOPTIONS='-c optimizer=off -c codegen=off' installcheck-resgroup
BLDWRAP_POSTGRES_CONF_ADDONS: "fsync=off"
TEST_OS: centos
- name: regression_tests_gpcloud_centos
plan:
- aggregate:
......
#!/bin/bash -l
set -eox pipefail
CWDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
basedir=/sys/fs/cgroup
options=rw,nosuid,nodev,noexec,relatime
groups="hugetlb freezer pids devices cpuset blkio net_prio net_cls cpuacct cpu memory perf_event"
# mount cgroup base dir
mkdir -p $basedir
mount -t tmpfs tmpfs $basedir
# mount cgroup controllers
for group in $groups; do
mkdir -p $basedir/$group
mount -t cgroup -o $options,$group cgroup $basedir/$group
done
mkdir -p $basedir/cpu/gpdb
# set all dirs' permission to 777 to allow test cases to control
# when and how cgroup is enabled
find $basedir -type d | xargs chmod 777
# do the actual job
${CWDIR}/ic_gpdb.bash
platform: linux
image_resource:
type: docker-image
inputs:
- name: gpdb_src
- name: bin_gpdb
outputs:
params:
MAKE_TEST_COMMAND: ""
BLDWRAP_POSTGRES_CONF_ADDONS: ""
TEST_OS: ""
run:
path: gpdb_src/concourse/scripts/ic_gpdb_resgroup.bash
#!/usr/bin/env python
#
# Copyright (c) 2017, Pivotal Software Inc.
#
import os
import sys
class dummy(object):
def validate_all(self):
exit("resource group is not supported on this platform")
class cgroup(object):
mount_point = "/sys/fs/cgroup/"
tab = { 'r': os.R_OK, 'w': os.W_OK, 'x': os.X_OK, 'f': os.F_OK }
impl = "cgroup"
error_prefix = " is not properly configured: "
def validate_all(self):
"""
Check the permissions of the toplevel gpdb cgroup dirs.
The checks should keep in sync with
src/backend/utils/resgroup/resgroup-ops-cgroup.c
"""
self.validate_permission("cpu/gpdb/", "rwx")
self.validate_permission("cpu/gpdb/cgroup.procs", "rw")
self.validate_permission("cpu/gpdb/cpu.cfs_period_us", "rw")
self.validate_permission("cpu/gpdb/cpu.cfs_quota_us", "rw")
self.validate_permission("cpu/gpdb/cpu.shares", "rw")
self.validate_permission("cpuacct/gpdb/", "rwx")
self.validate_permission("cpuacct/gpdb/cgroup.procs", "rw")
self.validate_permission("cpuacct/gpdb/cpuacct.usage", "r")
self.validate_permission("cpuacct/gpdb/cpuacct.stat", "r")
def die(self, msg):
exit(self.impl + self.error_prefix + msg)
def validate_permission(self, path, mode):
"""
Validate permission on path.
If path is a dir it must ends with '/'.
"""
try:
fullpath = os.path.join(self.mount_point, path)
pathtype = path[-1] == "/" and "directory" or "file"
modebits = reduce(lambda x, y: x | y,
map(lambda x: self.tab[x], mode), 0)
if not os.path.exists(fullpath):
self.die("%s '%s' does not exist" % (pathtype, fullpath))
if not os.access(fullpath, modebits):
self.die("%s '%s' permission denied: require permission '%s'" \
% (pathtype, fullpath, mode))
except IOError, e:
self.die("can't check permission on %s '%s': %s" \
% (pathtype, fullpath, str(e)))
if __name__ == '__main__':
if sys.platform.startswith('linux'):
cgroup().validate_all()
else:
dummy().validate_all()
......@@ -20,6 +20,7 @@ try:
from gpconfig_modules.database_segment_guc import DatabaseSegmentGuc
from gpconfig_modules.file_segment_guc import FileSegmentGuc
from gpconfig_modules.guc_collection import GucCollection
from gppylib.gpresgroup import GpResGroup
except ImportError as err:
sys.exit('Cannot import modules. Please check that you have sourced '
'greenplum_path.sh. Detail: ' + str(err))
......@@ -137,6 +138,9 @@ class Guc:
LOGGER.warn("Managing queries with resource groups is an "
"experimental feature. A work-in-progress version is "
"enabled.")
msg = GpResGroup().validate()
if msg is not None:
return msg
elif newval != "queue":
return "the value of gp_resource_manager must be 'group' or 'queue'"
......
#!/usr/bin/env python
#
# Copyright (c) 2017, Pivotal Software Inc.
#
from gppylib.commands import base
from gppylib.commands.unix import *
from gppylib.commands.gp import *
from gppylib.gparray import GpArray
from gppylib.gplog import get_default_logger
from gppylib.gphostcache import *
class GpResGroup(object):
def __init__(self):
self.logger = get_default_logger()
def validate(self):
pool = base.WorkerPool()
gp_array = GpArray.initFromCatalog(dbconn.DbURL(), utility=True)
host_cache = GpHostCache(gp_array, pool)
msg = None
for h in host_cache.get_hosts():
cmd = Command(h.hostname, "gpcheckresgroupimpl", REMOTE, h.hostname)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
failed = []
for i in items:
if not i.was_successful():
failed.append("[%s:%s]"%(i.remoteHost, i.get_stderr()))
pool.haltWork()
pool.joinWorkers()
if failed:
msg = ",".join(failed)
return msg
#!/usr/bin/env python
#
# Copyright (c) 2017, Pivotal Software Inc.
#
import unittest2 as unittest
import os
import sys
import shutil
import tempfile
import imp
gpcheckresgroupimpl_path = os.path.abspath('gpcheckresgroupimpl')
gpcheckresgroupimpl = imp.load_source('gpcheckresgroupimpl', gpcheckresgroupimpl_path)
import gpcheckresgroupimpl
@unittest.skipUnless(sys.platform.startswith("linux"), "requires linux")
class GpCheckResGroupImplCGroup(unittest.TestCase):
cgroup_mntpnt = None
cgroup_default_mntpnt = "/sys/fs/cgroup"
def setUp(self):
self.cgroup_mntpnt = tempfile.mkdtemp(prefix='fake-cgroup-mnt-')
os.mkdir(os.path.join(self.cgroup_mntpnt, "cpu"), 0755)
os.mkdir(os.path.join(self.cgroup_mntpnt, "cpuacct"), 0755)
self.cgroup = gpcheckresgroupimpl.cgroup()
self.cgroup.mount_point = self.cgroup_mntpnt
self.cgroup.die = self.mock_cgroup_die
os.mkdir(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb"), 0700)
self.touch(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cgroup.procs"), 0600)
self.touch(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.cfs_period_us"), 0600)
self.touch(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.cfs_quota_us"), 0600)
self.touch(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.shares"), 0600)
os.mkdir(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb"), 0700)
self.touch(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cgroup.procs"), 0600)
self.touch(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cpuacct.usage"), 0400)
self.touch(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cpuacct.stat"), 0400)
def tearDown(self):
shutil.rmtree(self.cgroup_mntpnt)
self.cgroup = None
def mock_cgroup_die(self, msg):
output = self.cgroup.impl + self.cgroup.error_prefix + msg
output = output.replace(self.cgroup_mntpnt, self.cgroup_default_mntpnt)
raise AssertionError(output)
def touch(self, path, mode):
with open(path, "w"):
pass
os.chmod(path, mode)
def test_proper_setup(self):
self.cgroup.validate_all()
def test_when_cpu_gpdb_dir_missing(self):
shutil.rmtree(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb"))
with self.assertRaisesRegexp(AssertionError, "directory '/sys/fs/cgroup/cpu/gpdb/' does not exist"):
self.cgroup.validate_all()
def test_when_cpu_gpdb_dir_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb"), 0500)
with self.assertRaisesRegexp(AssertionError, "directory '/sys/fs/cgroup/cpu/gpdb/' permission denied: require permission 'rwx'"):
self.cgroup.validate_all()
# restore permission for the dir to be removed in tearDown()
os.chmod(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb"), 0700)
def test_when_cpu_gpdb_cgroup_procs_missing(self):
os.unlink(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cgroup.procs"))
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpu/gpdb/cgroup.procs' does not exist"):
self.cgroup.validate_all()
def test_when_cpu_gpdb_cgroup_procs_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cgroup.procs"), 0100)
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpu/gpdb/cgroup.procs' permission denied: require permission 'rw'"):
self.cgroup.validate_all()
def test_when_cpu_gpdb_cpu_cfs_period_us_missing(self):
os.unlink(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.cfs_period_us"))
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpu/gpdb/cpu.cfs_period_us' does not exist"):
self.cgroup.validate_all()
def test_when_cpu_gpdb_cpu_cfs_period_us_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.cfs_period_us"), 0100)
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpu/gpdb/cpu.cfs_period_us' permission denied: require permission 'rw'"):
self.cgroup.validate_all()
def test_when_cpu_gpdb_cpu_cfs_quota_us_missing(self):
os.unlink(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.cfs_quota_us"))
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpu/gpdb/cpu.cfs_quota_us' does not exist"):
self.cgroup.validate_all()
def test_when_cpu_gpdb_cpu_cfs_quota_us_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.cfs_quota_us"), 0100)
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpu/gpdb/cpu.cfs_quota_us' permission denied: require permission 'rw'"):
self.cgroup.validate_all()
def test_when_cpu_gpdb_cpu_shares_missing(self):
os.unlink(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.shares"))
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpu/gpdb/cpu.shares' does not exist"):
self.cgroup.validate_all()
def test_when_cpu_gpdb_cpu_shares_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpu", "gpdb", "cpu.shares"), 0100)
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpu/gpdb/cpu.shares' permission denied: require permission 'rw'"):
self.cgroup.validate_all()
def test_when_cpuacct_gpdb_dir_missing(self):
shutil.rmtree(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb"))
with self.assertRaisesRegexp(AssertionError, "directory '/sys/fs/cgroup/cpuacct/gpdb/' does not exist"):
self.cgroup.validate_all()
def test_when_cpuacct_gpdb_dir_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb"), 0500)
with self.assertRaisesRegexp(AssertionError, "directory '/sys/fs/cgroup/cpuacct/gpdb/' permission denied: require permission 'rwx'"):
self.cgroup.validate_all()
# restore permission for the dir to be removed in tearDown()
os.chmod(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb"), 0700)
def test_when_cpuacct_gpdb_cgroup_procs_missing(self):
os.unlink(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cgroup.procs"))
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpuacct/gpdb/cgroup.procs' does not exist"):
self.cgroup.validate_all()
def test_when_cpuacct_gpdb_cgroup_procs_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cgroup.procs"), 0100)
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpuacct/gpdb/cgroup.procs' permission denied: require permission 'rw'"):
self.cgroup.validate_all()
def test_when_cpuacct_gpdb_cpuacct_usage_missing(self):
os.unlink(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cpuacct.usage"))
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpuacct/gpdb/cpuacct.usage' does not exist"):
self.cgroup.validate_all()
def test_when_cpuacct_gpdb_cpuacct_usage_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cpuacct.usage"), 0100)
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpuacct/gpdb/cpuacct.usage' permission denied: require permission 'r'"):
self.cgroup.validate_all()
def test_when_cpuacct_gpdb_cpuacct_stat_missing(self):
os.unlink(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cpuacct.stat"))
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpuacct/gpdb/cpuacct.stat' does not exist"):
self.cgroup.validate_all()
def test_when_cpuacct_gpdb_cpuacct_stat_bad_permission(self):
os.chmod(os.path.join(self.cgroup_mntpnt, "cpuacct", "gpdb", "cpuacct.stat"), 0100)
with self.assertRaisesRegexp(AssertionError, "file '/sys/fs/cgroup/cpuacct/gpdb/cpuacct.stat' permission denied: require permission 'r'"):
self.cgroup.validate_all()
if __name__ == '__main__':
unittest.main()
......@@ -27,6 +27,7 @@
#include "utils/memutils.h"
#include "utils/resource_manager.h"
#include "utils/resgroup.h"
#include "utils/resgroup-ops.h"
#include "storage/bfz.h"
#include "storage/proc.h"
#include "cdb/memquota.h"
......@@ -1206,7 +1207,10 @@ gpvars_assign_gp_resource_manager_policy(const char *newval, bool doit, GucSourc
else if (!pg_strcasecmp("queue", newval))
newtype = RESOURCE_MANAGER_POLICY_QUEUE;
else if (!pg_strcasecmp("group", newval))
{
ResGroupOps_Bless();
newtype = RESOURCE_MANAGER_POLICY_GROUP;
}
else
elog(ERROR, "unknown resource manager policy: current policy is '%s'", gpvars_show_gp_resource_manager_policy());
......
......@@ -13,6 +13,7 @@
#include "postgres.h"
#include "funcapi.h"
#include "gp-libpq-fe.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/xact.h"
......@@ -21,6 +22,7 @@
#include "catalog/pg_authid.h"
#include "catalog/pg_resgroup.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbvars.h"
#include "commands/comment.h"
#include "commands/defrem.h"
......@@ -30,6 +32,7 @@
#include "utils/datetime.h"
#include "utils/fmgroids.h"
#include "utils/resgroup.h"
#include "utils/resgroup-ops.h"
#include "utils/resource_manager.h"
#include "utils/syscache.h"
......@@ -48,6 +51,22 @@ typedef struct ResourceGroupOptions
float redzoneLimit;
} ResourceGroupOptions;
typedef struct ResourceGroupStatusRow
{
Datum oid;
double cpuAvgUsage;
} ResourceGroupStatusRow;
typedef struct ResourceGroupStatusContext
{
ResGroupStatType type;
int nrows;
ResourceGroupStatusRow rows[1];
} ResourceGroupStatusContext;
static float str2Float(const char *str, const char *prop);
static float text2Float(const text *text, const char *prop);
static int getResgroupOptionType(const char* defname);
static void parseStmtOptions(CreateResourceGroupStmt *stmt, ResourceGroupOptions *options);
......@@ -60,6 +79,8 @@ static void deleteResgroupCapabilities(Oid groupid);
static void createResGroupAbortCallback(ResourceReleasePhase phase, bool isCommit, bool isTopLevel, void *arg);
static void dropResGroupAbortCallback(ResourceReleasePhase phase, bool isCommit, bool isTopLevel, void *arg);
static void alterResGroupCommitCallback(ResourceReleasePhase phase, bool isCommit, bool isTopLevel, void *arg);
static ResGroupStatType propNameToType(const char *name);
static void getCpuUsage(ResourceGroupStatusContext *ctx);
/*
* CREATE RESOURCE GROUP
......@@ -182,6 +203,11 @@ CreateResourceGroup(CreateResourceGroupStmt *stmt)
AllocResGroupEntry(groupid);
/* Create os dependent part for this resource group */
ResGroupOps_CreateGroup(groupid);
ResGroupOps_SetCpuRateLimit(groupid, options.cpuRateLimit);
/* Argument of callback function should be allocated in heap region */
callbackArg = (Oid *)MemoryContextAlloc(TopMemoryContext, sizeof(Oid));
*callbackArg = groupid;
......@@ -385,7 +411,7 @@ AlterResourceGroup(AlterResourceGroupStmt *stmt)
}
/*
* Get 'concurrency' of on resource group in pg_resgroupcapability.
* Get 'concurrency' of one resource group in pg_resgroupcapability.
*/
void
GetConcurrencyForResGroup(int groupId, int *value, int *proposed)
......@@ -402,6 +428,21 @@ GetConcurrencyForResGroup(int groupId, int *value, int *proposed)
*proposed = pg_atoi(proposedStr, sizeof(int32), 0);
}
/*
* Get 'cpu_rate_limit' of one resource group in pg_resgroupcapability.
*/
float
GetCpuRateLimitForResGroup(int groupId)
{
char *valueStr;
char *proposedStr;
getResgroupCapabilityEntry(groupId, RESGROUP_LIMIT_TYPE_CPU,
&valueStr, &proposedStr);
return str2Float(valueStr, "cpu_rate_limit");
}
/*
* Get resource group id for a role in pg_authid
*/
......@@ -473,6 +514,169 @@ GetResGroupIdForRole(Oid roleid)
return groupId;
}
/*
* Convert from property name to ResGroupStatType.
*/
static ResGroupStatType
propNameToType(const char *name)
{
if (!strcmp(name, "num_running"))
return RES_GROUP_STAT_NRUNNING;
else if (!strcmp(name, "num_queueing"))
return RES_GROUP_STAT_NQUEUEING;
else if (!strcmp(name, "cpu_usage"))
return RES_GROUP_STAT_CPU_USAGE;
else if (!strcmp(name, "memory_usage"))
return RES_GROUP_STAT_MEM_USAGE;
else if (!strcmp(name, "total_queue_duration"))
return RES_GROUP_STAT_TOTAL_QUEUE_TIME;
else if (!strcmp(name, "num_queued"))
return RES_GROUP_STAT_TOTAL_QUEUED;
else if (!strcmp(name, "num_executed"))
return RES_GROUP_STAT_TOTAL_EXECUTED;
else
return RES_GROUP_STAT_UNKNOWN;
}
/*
* Get cpu usage.
*
* On QD this function dispatch the request to all QEs, collecting both
* QEs' and QD's cpu usage and calculate the average.
*
* On QE this function only collect the cpu usage on itself.
*
* Cpu usage is a ratio within [0%, 100%], however due to error the actual
* value might be greater than 100%, that's not a bug.
*/
static void
getCpuUsage(ResourceGroupStatusContext *ctx)
{
int64 *usages;
TimestampTz *timestamps;
int nsegs = 1;
int ncores;
int i, j;
if (!IsResGroupEnabled())
return;
usages = palloc(sizeof(*usages) * ctx->nrows);
timestamps = palloc(sizeof(*timestamps) * ctx->nrows);
ncores = ResGroupOps_GetCpuCores();
for (j = 0; j < ctx->nrows; j++)
{
ResourceGroupStatusRow *row = &ctx->rows[j];
Oid rsgid = DatumGetObjectId(row->oid);
usages[j] = ResGroupOps_GetCpuUsage(rsgid);
timestamps[j] = GetCurrentTimestamp();
}
if (Gp_role == GP_ROLE_DISPATCH)
{
CdbPgResults cdb_pgresults = {NULL, 0};
StringInfoData buffer;
initStringInfo(&buffer);
appendStringInfo(&buffer, "SELECT rsgid, value FROM pg_resgroup_get_status_kv('cpu_usage')");
CdbDispatchCommand(buffer.data, DF_WITH_SNAPSHOT, &cdb_pgresults);
if (cdb_pgresults.numResults == 0)
elog(ERROR, "gp_resgroup_status didn't get back any cpu usage statistics from the segDBs");
nsegs += cdb_pgresults.numResults;
for (i = 0; i < cdb_pgresults.numResults; i++)
{
struct pg_result *pg_result = cdb_pgresults.pg_results[i];
/*
* Any error here should have propagated into errbuf, so we shouldn't
* ever see anything other that tuples_ok here. But, check to be
* sure.
*/
if (PQresultStatus(pg_result) != PGRES_TUPLES_OK)
{
cdbdisp_clearCdbPgResults(&cdb_pgresults);
elog(ERROR, "gp_resgroup_status: resultStatus not tuples_Ok");
}
else
{
Assert(PQntuples(pg_result) == ctx->nrows);
for (j = 0; j < ctx->nrows; j++)
{
double usage;
const char *result;
ResourceGroupStatusRow *row = &ctx->rows[j];
Oid rsgid = pg_atoi(PQgetvalue(pg_result, j, 0),
sizeof(Oid), 0);
/*
* we assume QD and QE shall have the same order
* for all the resgroups, but in case this assumption
* failed we do a full lookup
*/
if (rsgid != DatumGetObjectId(row->oid))
{
int k;
for (k = 0; k < ctx->nrows; k++)
{
row = &ctx->rows[k];
if (rsgid == DatumGetObjectId(row->oid))
break;
}
if (k == ctx->nrows)
elog(ERROR, "gp_resgroup_status: inconsistent resgroups between QD and QE");
}
result = PQgetvalue(pg_result, j, 1);
sscanf(result, "%lf", &usage);
row->cpuAvgUsage += usage;
}
}
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);
}
else
{
pg_usleep(300000);
}
for (j = 0; j < ctx->nrows; j++)
{
int64 duration;
long secs;
int usecs;
int64 usage;
ResourceGroupStatusRow *row = &ctx->rows[j];
Oid rsgid = DatumGetObjectId(row->oid);
usage = ResGroupOps_GetCpuUsage(rsgid) - usages[j];
TimestampDifference(timestamps[j], GetCurrentTimestamp(),
&secs, &usecs);
duration = secs * 1000000 + usecs;
/*
* usage is the cpu time (nano seconds) obtained by this group
* in the time duration (micro seconds), so cpu time on one core
* can be calculated as:
*
* usage / 1000 / duration / ncores
*
* To convert it to percentange we should multiple 100%.
*/
row->cpuAvgUsage += usage / 10.0 / duration / ncores;
row->cpuAvgUsage /= nsegs;
}
}
/*
* Get status of resource groups
*/
......@@ -480,6 +684,7 @@ Datum
pg_resgroup_get_status_kv(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
ResourceGroupStatusContext *ctx;
if (SRF_IS_FIRSTCALL())
{
......@@ -507,8 +712,13 @@ pg_resgroup_get_status_kv(PG_FUNCTION_ARGS)
Relation pg_resgroup_rel;
SysScanDesc sscan;
HeapTuple tuple;
char * prop = text_to_cstring(PG_GETARG_TEXT_P(0));
int ctxsize = sizeof(ResourceGroupStatusContext) +
sizeof(ResourceGroupStatusRow) * (MaxResourceGroups - 1);
funcctx->user_fctx = palloc0(sizeof(Datum) * MaxResourceGroups);
funcctx->user_fctx = palloc(ctxsize);
ctx = (ResourceGroupStatusContext *) funcctx->user_fctx;
pg_resgroup_rel = heap_open(ResGroupRelationId, AccessShareLock);
......@@ -517,11 +727,24 @@ pg_resgroup_get_status_kv(PG_FUNCTION_ARGS)
while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
{
Assert(funcctx->max_calls < MaxResourceGroups);
((Datum *) funcctx->user_fctx)[funcctx->max_calls++] = ObjectIdGetDatum(HeapTupleGetOid(tuple));
ctx->rows[funcctx->max_calls].cpuAvgUsage = 0;
ctx->rows[funcctx->max_calls++].oid =
ObjectIdGetDatum(HeapTupleGetOid(tuple));
}
systable_endscan(sscan);
heap_close(pg_resgroup_rel, AccessShareLock);
ctx->nrows = funcctx->max_calls;
ctx->type = propNameToType(prop);
switch (ctx->type)
{
case RES_GROUP_STAT_CPU_USAGE:
getCpuUsage(ctx);
break;
default:
break;
}
}
MemoryContextSwitchTo(oldcontext);
......@@ -529,6 +752,7 @@ pg_resgroup_get_status_kv(PG_FUNCTION_ARGS)
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
ctx = (ResourceGroupStatusContext *) funcctx->user_fctx;
if (funcctx->call_cntr < funcctx->max_calls)
{
......@@ -539,37 +763,41 @@ pg_resgroup_get_status_kv(PG_FUNCTION_ARGS)
HeapTuple tuple;
Oid groupId;
char statVal[MAXDATELEN + 1];
ResourceGroupStatusRow *row = &ctx->rows[funcctx->call_cntr];
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
MemSet(statVal, 0, sizeof(statVal));
values[0] = ((Datum *) funcctx->user_fctx)[funcctx->call_cntr];
values[0] = row->oid;
values[1] = CStringGetTextDatum(prop);
groupId = DatumGetObjectId(values[0]);
/* Fill with dummy values */
if (!strcmp(prop, "num_running"))
ResGroupGetStat(groupId, RES_GROUP_STAT_NRUNNING, statVal, sizeof(statVal));
else if (!strcmp(prop, "num_queueing"))
ResGroupGetStat(groupId, RES_GROUP_STAT_NQUEUEING, statVal, sizeof(statVal));
else if (!strcmp(prop, "cpu_usage"))
snprintf(statVal, sizeof(statVal), "%.2f", 0.0);
else if (!strcmp(prop, "memory_usage"))
snprintf(statVal, sizeof(statVal), "%.2f", 0.0);
else if (!strcmp(prop, "total_queue_duration"))
ResGroupGetStat(groupId, RES_GROUP_STAT_TOTAL_QUEUE_TIME, statVal, sizeof(statVal));
else if (!strcmp(prop, "num_queued"))
ResGroupGetStat(groupId, RES_GROUP_STAT_TOTAL_QUEUED, statVal, sizeof(statVal));
else if (!strcmp(prop, "num_executed"))
ResGroupGetStat(groupId, RES_GROUP_STAT_TOTAL_EXECUTED, statVal, sizeof(statVal));
else
/* unknown property name */
nulls[2] = true;
switch (ctx->type)
{
default:
case RES_GROUP_STAT_NRUNNING:
case RES_GROUP_STAT_NQUEUEING:
case RES_GROUP_STAT_TOTAL_EXECUTED:
case RES_GROUP_STAT_TOTAL_QUEUED:
case RES_GROUP_STAT_TOTAL_QUEUE_TIME:
ResGroupGetStat(groupId, ctx->type, statVal, sizeof(statVal), prop);
values[2] = CStringGetTextDatum(statVal);
break;
case RES_GROUP_STAT_CPU_USAGE:
snprintf(statVal, sizeof(statVal), "%.2lf%%",
row->cpuAvgUsage);
values[2] = CStringGetTextDatum(statVal);
break;
if (!nulls[2])
values[2] = CStringGetTextDatum(statVal);
case RES_GROUP_STAT_MEM_USAGE:
/* not supported yet, fill with dummy value */
snprintf(statVal, sizeof(statVal), "%d", 0);
values[2] = CStringGetTextDatum(statVal);
break;
}
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
......@@ -712,6 +940,9 @@ createResGroupAbortCallback(ResourceReleasePhase phase,
* after LWLockReleaseAll in AbortTransaction, it is safe here
*/
FreeResGroupEntry(groupId);
/* remove the os dependent part for this resource group */
ResGroupOps_DestroyGroup(groupId);
}
UnregisterResourceReleaseCallback(createResGroupAbortCallback, arg);
......@@ -736,6 +967,13 @@ dropResGroupAbortCallback(ResourceReleasePhase phase,
groupId = *(Oid *)arg;
ResGroupDropCheckForWakeup(groupId, isCommit);
if (isCommit)
{
/* remove the os dependent part for this resource group */
ResGroupOps_DestroyGroup(groupId);
}
UnregisterResourceReleaseCallback(dropResGroupAbortCallback, arg);
}
......@@ -764,6 +1002,13 @@ alterResGroupCommitCallback(ResourceReleasePhase phase,
/* wake up */
ResGroupAlterCheckForWakeup(groupId);
}
else
{
groupId = *(Oid *)arg;
/* remove the os dependent part for this resource group */
ResGroupOps_DestroyGroup(groupId);
}
UnregisterResourceReleaseCallback(alterResGroupCommitCallback, arg);
}
......@@ -1034,7 +1279,7 @@ getResgroupCapabilityEntry(int groupId, int type, char **value, char **proposed)
}
}
/*
/*
* Delete capability entries of one resource group.
*/
static void
......@@ -1142,16 +1387,14 @@ GetResGroupNameForId(Oid oid, LOCKMODE lockmode)
}
/*
* Convert a text to a float value.
* Convert a C str to a float value.
*
* @param text the text
* @param str the C str
* @param prop the property name
*/
static float
text2Float(const text *text, const char *prop)
str2Float(const char *str, const char *prop)
{
char *str = DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(text)));
char *end = NULL;
double val = strtod(str, &end);
......@@ -1164,3 +1407,18 @@ text2Float(const text *text, const char *prop)
return (float) val;
}
/*
* Convert a text to a float value.
*
* @param text the text
* @param prop the property name
*/
static float
text2Float(const text *text, const char *prop)
{
char *str = DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(text)));
return str2Float(str, prop);
}
......@@ -94,6 +94,7 @@
#include <pthread.h>
#include "utils/resscheduler.h"
#include "utils/resgroup.h"
#include "utils/resgroup-ops.h"
#include "pgstat.h"
#include "executor/nodeFunctionscan.h"
#include "cdb/cdbfilerep.h"
......@@ -4637,9 +4638,15 @@ PostgresMain(int argc, char *argv[],
* Initialize resource scheduler hash structure.
*/
if (IsResQueueEnabled() && Gp_role == GP_ROLE_DISPATCH && !am_walsender)
{
InitResQueues();
}
else if (IsResGroupEnabled() && (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) && !am_walsender)
{
InitResGroups();
AssignResGroup();
ResGroupOps_AdjustGUCs();
}
/*
* Now all GUC states are fully set up. Report them to client if
......
......@@ -42,6 +42,7 @@
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/resgroup.h"
#include "utils/resscheduler.h"
#include "utils/syscache.h"
......@@ -689,6 +690,11 @@ SetCurrentRoleId(Oid roleid, bool is_superuser)
SetResQueueId();
}
if ((Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) && IsResGroupEnabled())
{
AssignResGroup();
}
SetConfigOption("is_superuser",
is_superuser ? "on" : "off",
PGC_INTERNAL, PGC_S_OVERRIDE);
......
......@@ -358,6 +358,7 @@ int gp_resqueue_priority_grouping_timeout;
double gp_resqueue_priority_cpucores_per_segment;
char *gp_resqueue_priority_default_value;
bool gp_debug_resqueue_priority = false;
double gp_resource_group_cpu_limit;
/* Perfmon segment GUCs */
int gp_perfmon_segment_interval;
......@@ -4757,6 +4758,15 @@ struct config_real ConfigureNamesReal_gp[] =
4.0, 0.1, 512.0, NULL, NULL
},
{
{"gp_resource_group_cpu_limit", PGC_POSTMASTER, RESOURCES,
gettext_noop("Maximum percentage of CPU resources assigned to a cluster."),
NULL
},
&gp_resource_group_cpu_limit,
0.9, 0.1, 1.0, NULL, NULL
},
{
{"gp_simex_rand", PGC_USERSET, GP_ERROR_HANDLING,
gettext_noop("Propability of injecting an Exceptional Situation in SimEx."),
......
......@@ -14,4 +14,10 @@ include $(top_builddir)/src/Makefile.global
OBJS = resgroup.o
ifeq ($(PORTNAME),linux)
OBJS += resgroup-ops-cgroup.o
else
OBJS += resgroup-ops-dummy.o
endif
include $(top_srcdir)/src/backend/common.mk
/*-------------------------------------------------------------------------
*
* resgroup-ops-cgroup.c
* OS dependent resource group operations - cgroup implementation
*
*
* Copyright (c) 2017, Pivotal Software Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cdb/cdbvars.h"
#include "postmaster/backoff.h"
#include "utils/resgroup-ops.h"
#ifndef __linux__
#error cgroup is only available on linux
#endif
#include <unistd.h>
#include <sched.h>
#include <sys/file.h>
#include <sys/stat.h>
/*
* Interfaces for OS dependent operations.
*
* Resource group relies on OS dependent group implementation to manage
* resources like cpu usage, such as cgroup on Linux system.
* We call it OS group in below function description.
*
* So far these operations are mainly for CPU rate limitation and accounting.
*/
#define CGROUP_ERROR_PREFIX "cgroup is not properly configured: "
#define CGROUP_ERROR(...) do { \
elog(ERROR, CGROUP_ERROR_PREFIX __VA_ARGS__); \
} while (false)
static char * buildPath(Oid group, const char *comp, const char *prop, char *path, size_t pathsize);
static int lockDir(const char *path, bool block);
static void unassignGroup(Oid group, const char *comp, int fddir);
static bool createDir(Oid group, const char *comp);
static bool removeDir(Oid group, const char *comp, bool unassign);
static int getCpuCores(void);
static size_t readData(Oid group, const char *comp, const char *prop, char *data, size_t datasize);
static void writeData(Oid group, const char *comp, const char *prop, char *data, size_t datasize);
static int64 readInt64(Oid group, const char *comp, const char *prop);
static void writeInt64(Oid group, const char *comp, const char *prop, int64 x);
static bool checkPermission(Oid group, bool report);
static int cpucores = 0;
/*
* Build path string with parameters.
*
* - if group is 0 then the path is for the gpdb toplevel cgroup;
* - if prop is "" then the path is for the cgroup dir;
*/
static char *
buildPath(Oid group,
const char *comp,
const char *prop,
char *path,
size_t pathsize)
{
if (group)
snprintf(path, pathsize, "/sys/fs/cgroup/%s/gpdb/%d/%s", comp, group, prop);
else
snprintf(path, pathsize, "/sys/fs/cgroup/%s/gpdb/%s", comp, prop);
return path;
}
/*
* Unassign all the processes from group.
*
* These processes will be moved to the gpdb toplevel cgroup.
*
* This function must be called with the gpdb toplevel dir locked,
* fddir is the fd for this lock, on any failure fddir will be closed
* (and unlocked implicitly) then an error is raised.
*/
static void
unassignGroup(Oid group, const char *comp, int fddir)
{
char path[128];
size_t pathsize = sizeof(path);
char *buf;
size_t bufsize;
const size_t bufdeltasize = 512;
size_t buflen = -1;
int fdr = -1;
int fdw = -1;
/*
* Check an operation result on path.
*
* Operation can be open(), close(), read(), write(), etc., which must
* set the errno on error.
*
* - condition describes the expected result of the operation;
* - action is the cleanup action on failure, such as closing the fd,
* multiple actions can be specified by putting them in brackets,
* such as (op1, op2);
* - message describes what's failed;
*/
#define __CHECK(condition, action, message) do { \
if (!(condition)) \
{ \
/* save errno in case it's changed in actions */ \
int err = errno; \
action; \
CGROUP_ERROR(message ": %s: %s", path, strerror(err)); \
} \
} while (0)
buildPath(group, comp, "cgroup.procs", path, pathsize);
fdr = open(path, O_RDONLY);
__CHECK(fdr >= 0, ( close(fddir) ), "can't open file for read");
buflen = 0;
bufsize = bufdeltasize;
buf = palloc(bufsize);
while (1)
{
int n = read(fdr, buf + buflen, bufdeltasize);
__CHECK(n >= 0, ( close(fdr), close(fddir) ), "can't read from file");
buflen += n;
if (n < bufdeltasize)
break;
bufsize += bufdeltasize;
buf = repalloc(buf, bufsize);
}
close(fdr);
buildPath(0, comp, "cgroup.procs", path, pathsize);
fdw = open(path, O_WRONLY);
__CHECK(fdw >= 0, ( close(fddir) ), "can't open file for write");
char *ptr = buf;
char *end = NULL;
long pid;
/*
* as required by cgroup, only one pid can be migrated in each single
* write() call, so we have to parse the pids from the buffer first,
* then write them one by one.
*/
while (1)
{
pid = strtol(ptr, &end, 10);
if (ptr == end)
break;
int n = write(fdw, ptr, end - ptr);
__CHECK(n >= 0, ( close(fddir) ), "can't write to file");
__CHECK(n == end - ptr, ( close(fddir) ), "can't write to file");
ptr = end;
}
close(fdw);
#undef __CHECK
}
/*
* Lock the dir specified by path.
*
* - path must be a dir path;
* - if block is true then lock in block mode, otherwise will give up if
* the dir is already locked;
*/
static int
lockDir(const char *path, bool block)
{
int fddir;
fddir = open(path, O_RDONLY | O_DIRECTORY);
if (fddir < 0)
{
if (errno == ENOENT)
{
/* the dir doesn't exist, nothing to do */
return -1;
}
CGROUP_ERROR("can't open dir to lock: %s: %s",
path, strerror(errno));
}
int flags = LOCK_EX;
if (block)
flags |= LOCK_NB;
while (flock(fddir, flags))
{
/*
* EAGAIN is not described in flock(2),
* however it does appear in practice.
*/
if (errno == EAGAIN)
continue;
int err = errno;
close(fddir);
/*
* In block mode all errors should be reported;
* In non block mode only report errors != EWOULDBLOCK.
*/
if (block || err != EWOULDBLOCK)
CGROUP_ERROR("can't lock dir: %s: %s", path, strerror(err));
return -1;
}
/*
* Even if we accquired the lock the dir may still been removed by other
* processes, e.g.:
*
* 1: open()
* 1: flock() -- process 1 accquired the lock
*
* 2: open()
* 2: flock() -- blocked by process 1
*
* 1: rmdir()
* 1: close() -- process 1 released the lock
*
* 2:flock() will now return w/o error as process 2 still has a valid
* fd (reference) on the target dir, and process 2 does accquired the lock
* successfully. However as the dir is already removed so process 2
* shouldn't make any further operation (rmdir(), etc.) on the dir.
*
* So we check for the existence of the dir again and give up if it's
* already removed.
*/
if (access(path, F_OK))
{
/* the dir is already removed by other process, nothing to do */
close(fddir);
return -1;
}
return fddir;
}
/*
* Create the cgroup dir for group.
*/
static bool
createDir(Oid group, const char *comp)
{
char path[MAXPGPATH];
size_t pathsize = sizeof(path);
buildPath(group, comp, "", path, pathsize);
if (access(path, F_OK))
{
/* the dir is not created yet, create it */
if (mkdir(path, 0755) && errno != EEXIST)
return false;
}
return true;
}
/*
* Remove the cgroup dir for group.
*
* - if unassign is true then unassign all the processes first before removal;
*/
static bool
removeDir(Oid group, const char *comp, bool unassign)
{
char path[128];
size_t pathsize = sizeof(path);
int fddir;
buildPath(group, comp, "", path, pathsize);
/*
* To prevent race condition between multiple processes we require a dir
* to be removed with the lock accquired first.
*/
fddir = lockDir(path, true);
if (fddir < 0)
{
/* the dir is already removed */
return true;
}
if (unassign)
unassignGroup(group, comp, fddir);
if (rmdir(path))
{
int err = errno;
close(fddir);
/*
* we don't check for ENOENT again as we already accquired the lock
* on this dir and the dir still exist at that time, so if then
* it's removed by other processes then it's a bug.
*/
CGROUP_ERROR("can't remove dir: %s: %s", path, strerror(err));
}
/* close() also releases the lock */
close(fddir);
return true;
}
/*
* Get the cpu cores assigned for current system or container.
*
* Suppose a physical machine has 8 cpu cores, 2 of them assigned to
* a container, then the return value is:
* - 8 if running directly on the machine;
* - 2 if running in the container;
*/
static int
getCpuCores(void)
{
if (cpucores == 0)
{
/*
* cpuset ops requires _GNU_SOURCE to be defined,
* and _GNU_SOURCE is forced on in src/template/linux,
* so we assume these ops are always available on linux.
*/
cpu_set_t cpuset;
int i;
if (sched_getaffinity(0, sizeof(cpuset), &cpuset) < 0)
CGROUP_ERROR("can't get cpu cores: %s", strerror(errno));
for (i = 0; i < CPU_SETSIZE; i++)
{
if (CPU_ISSET(i, &cpuset))
cpucores++;
}
}
if (cpucores == 0)
CGROUP_ERROR("can't get cpu cores");
return cpucores;
}
/*
* Read at most datasize bytes from a cgroup interface file.
*/
static size_t
readData(Oid group, const char *comp, const char *prop, char *data, size_t datasize)
{
char path[128];
size_t pathsize = sizeof(path);
buildPath(group, comp, prop, path, pathsize);
int fd = open(path, O_RDONLY);
if (fd < 0)
CGROUP_ERROR("can't open file '%s': %s", path, strerror(errno));
size_t ret = read(fd, data, datasize);
/* save errno before close() */
int err = errno;
close(fd);
if (ret < 0)
CGROUP_ERROR("can't read data from file '%s': %s", path, strerror(err));
return ret;
}
/*
* Write datasize bytes to a cgroup interface file.
*/
static void
writeData(Oid group, const char *comp, const char *prop, char *data, size_t datasize)
{
char path[128];
size_t pathsize = sizeof(path);
buildPath(group, comp, prop, path, pathsize);
int fd = open(path, O_WRONLY);
if (fd < 0)
CGROUP_ERROR("can't open file '%s': %s", path, strerror(errno));
size_t ret = write(fd, data, datasize);
/* save errno before close */
int err = errno;
close(fd);
if (ret < 0)
CGROUP_ERROR("can't write data to file '%s': %s", path, strerror(err));
if (ret != datasize)
CGROUP_ERROR("can't write all data to file '%s'", path);
}
/*
* Read an int64 value from a cgroup interface file.
*/
static int64
readInt64(Oid group, const char *comp, const char *prop)
{
int64 x;
char data[64];
size_t datasize = sizeof(data);
readData(group, comp, prop, data, datasize);
if (sscanf(data, "%lld", (long long *) &x) != 1)
CGROUP_ERROR("invalid number '%s'", data);
return x;
}
/*
* Write an int64 value to a cgroup interface file.
*/
static void
writeInt64(Oid group, const char *comp, const char *prop, int64 x)
{
char data[64];
size_t datasize = sizeof(data);
snprintf(data, datasize, "%lld", (long long) x);
writeData(group, comp, prop, data, strlen(data));
}
/*
* Check permissions on group's cgroup dir & interface files.
*
* - if report is true then raise an error on and bad permission,
* otherwise only return false;
*/
static bool
checkPermission(Oid group, bool report)
{
char path[128];
size_t pathsize = sizeof(path);
const char *comp;
#define __CHECK(prop, perm) do { \
buildPath(group, comp, prop, path, pathsize); \
if (access(path, perm)) \
{ \
if (report) \
{ \
CGROUP_ERROR("can't access %s '%s': %s", \
prop[0] ? "file" : "directory", \
path, \
strerror(errno)); \
} \
return false; \
} \
} while (0)
/*
* These checks should keep in sync with
* gpMgmt/bin/gpcheckresgroupimpl
*/
comp = "cpu";
__CHECK("", R_OK | W_OK | X_OK);
__CHECK("cgroup.procs", R_OK | W_OK);
__CHECK("cpu.cfs_period_us", R_OK | W_OK);
__CHECK("cpu.cfs_quota_us", R_OK | W_OK);
__CHECK("cpu.shares", R_OK | W_OK);
comp = "cpuacct";
__CHECK("", R_OK | W_OK | X_OK);
__CHECK("cgroup.procs", R_OK | W_OK);
__CHECK("cpuacct.usage", R_OK);
__CHECK("cpuacct.stat", R_OK);
#undef __CHECK
return true;
}
/* Return the name for the OS group implementation */
const char *
ResGroupOps_Name(void)
{
return "cgroup";
}
/* Check whether the OS group implementation is available and useable */
void
ResGroupOps_Bless(void)
{
checkPermission(0, true);
}
/* Initialize the OS group */
void
ResGroupOps_Init(void)
{
/* cfs_quota_us := cfs_period_us * ncores * gp_resource_group_cpu_limit */
/* shares := 1024 * 256 (max possible value) */
int64 cfs_period_us;
int ncores = getCpuCores();
const char *comp = "cpu";
cfs_period_us = readInt64(0, comp, "cpu.cfs_period_us");
writeInt64(0, comp, "cpu.cfs_quota_us",
cfs_period_us * ncores * gp_resource_group_cpu_limit);
writeInt64(0, comp, "cpu.shares", 1024 * 256);
}
/* Adjust GUCs for this OS group implementation */
void
ResGroupOps_AdjustGUCs(void)
{
/*
* cgroup cpu limitation works best when all processes have equal
* priorities, so we force all the segments and postmaster to
* work with nice=0.
*
* this function should be called before GUCs are dispatched to segments.
*/
/* TODO: when cgroup is enabled we should move postmaster and maybe
* also other processes to a separate group or gpdb toplevel */
if (gp_segworker_relative_priority != 0)
{
/* TODO: produce a warning */
gp_segworker_relative_priority = 0;
}
}
/*
* Create the OS group for group.
*/
void
ResGroupOps_CreateGroup(Oid group)
{
int retry = 0;
if (!createDir(group, "cpu") || !createDir(group, "cpuacct"))
{
CGROUP_ERROR("can't create cgroup for resgroup '%d': %s",
group, strerror(errno));
}
/*
* although the group dir is created the interface files may not be
* created yet, so we check them repeatedly until everything is ready.
*/
while (++retry <= 10 && !checkPermission(group, false))
pg_usleep(1000);
if (retry > 10)
{
/*
* still not ready after 10 retries, might be a real error,
* raise the error.
*/
checkPermission(group, true);
}
}
/*
* Destroy the OS group for group.
*
* Fail if any process is running under it.
*/
void
ResGroupOps_DestroyGroup(Oid group)
{
if (!removeDir(group, "cpu", true) || !removeDir(group, "cpuacct", true))
{
CGROUP_ERROR("can't remove cgroup for resgroup '%d': %s",
group, strerror(errno));
}
}
/*
* Assign a process to the OS group. A process can only be assigned to one
* OS group, if it's already running under other OS group then it'll be moved
* out that OS group.
*
* pid is the process id.
*/
void
ResGroupOps_AssignGroup(Oid group, int pid)
{
writeInt64(group, "cpu", "cgroup.procs", pid);
writeInt64(group, "cpuacct", "cgroup.procs", pid);
}
/*
* Set the cpu rate limit for the OS group.
*
* cpu_rate_limit should be within (0.0, 1.0].
*/
void
ResGroupOps_SetCpuRateLimit(Oid group, float cpu_rate_limit)
{
const char *comp = "cpu";
/* SUB/shares := TOP/shares * cpu_rate_limit */
int64 shares = readInt64(0, comp, "cpu.shares");
writeInt64(group, comp, "cpu.shares", shares * cpu_rate_limit);
}
/*
* Get the cpu usage of the OS group, that is the total cpu time obtained
* by this OS group, in nano seconds.
*/
int64
ResGroupOps_GetCpuUsage(Oid group)
{
const char *comp = "cpuacct";
return readInt64(group, comp, "cpuacct.usage");
}
/*
* Get the count of cpu cores on the system.
*/
int
ResGroupOps_GetCpuCores(void)
{
return getCpuCores();
}
/*-------------------------------------------------------------------------
*
* resgroup-ops-dummy.c
* OS dependent resource group operations - dummy implementation
*
*
* Copyright (c) 2017, Pivotal Software Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "utils/resgroup-ops.h"
/*
* Interfaces for OS dependent operations.
*
* Resource group relies on OS dependent group implementation to manage
* resources like cpu usage, such as cgroup on Linux system.
* We call it OS group in below function description.
*
* So far these operations are mainly for CPU rate limitation and accounting.
*/
#define unsupported_system() \
elog(WARNING, "cpu rate limitation for resource group is unsupported on this system")
/* Return the name for the OS group implementation */
const char *
ResGroupOps_Name(void)
{
return "unsupported";
}
/* Check whether the OS group implementation is available and useable */
void
ResGroupOps_Bless(void)
{
unsupported_system();
}
/* Initialize the OS group */
void
ResGroupOps_Init(void)
{
unsupported_system();
}
/* Adjust GUCs for this OS group implementation */
void
ResGroupOps_AdjustGUCs(void)
{
unsupported_system();
}
/*
* Create the OS group for group.
*/
void
ResGroupOps_CreateGroup(Oid group)
{
unsupported_system();
}
/*
* Destroy the OS group for group.
*
* Fail if any process is running under it.
*/
void
ResGroupOps_DestroyGroup(Oid group)
{
unsupported_system();
}
/*
* Assign a process to the OS group. A process can only be assigned to one
* OS group, if it's already running under other OS group then it'll be moved
* out that OS group.
*
* pid is the process id.
*/
void
ResGroupOps_AssignGroup(Oid group, int pid)
{
unsupported_system();
}
/*
* Set the cpu rate limit for the OS group.
*
* cpu_rate_limit should be within (0.0, 1.0].
*/
void
ResGroupOps_SetCpuRateLimit(Oid group, float cpu_rate_limit)
{
unsupported_system();
}
/*
* Get the cpu usage of the OS group, that is the total cpu time obtained
* by this OS group, in nano seconds.
*/
int64
ResGroupOps_GetCpuUsage(Oid group)
{
unsupported_system();
return 0;
}
/*
* Get the count of cpu cores on the system.
*/
int
ResGroupOps_GetCpuCores(void)
{
unsupported_system();
return 1;
}
......@@ -17,6 +17,7 @@
#include "cdb/cdbvars.h"
#include "cdb/memquota.h"
#include "commands/resgroupcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/ipc.h"
......@@ -29,6 +30,7 @@
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/resgroup.h"
#include "utils/resgroup-ops.h"
#include "utils/resowner.h"
#include "utils/resource_manager.h"
......@@ -196,17 +198,24 @@ InitResGroups(void)
if (pResGroupControl->loaded)
goto exit;
ResGroupOps_Init();
numGroups = 0;
sscan = systable_beginscan(relResGroup, InvalidOid, false, SnapshotNow, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
{
bool groupOK = ResGroupCreate(HeapTupleGetOid(tuple));
Oid groupId = HeapTupleGetOid(tuple);
bool groupOK = ResGroupCreate(groupId);
float cpu_rate_limit = GetCpuRateLimitForResGroup(groupId);
if (!groupOK)
ereport(PANIC,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for resource groups")));
ResGroupOps_CreateGroup(groupId);
ResGroupOps_SetCpuRateLimit(groupId, cpu_rate_limit);
numGroups++;
Assert(numGroups <= MaxResourceGroups);
}
......@@ -380,7 +389,7 @@ void ResGroupAlterCheckForWakeup(Oid groupId)
* Retrieve statistic information of type from resource group
*/
void
ResGroupGetStat(Oid groupId, ResGroupStatType type, char *retStr, int retStrLen)
ResGroupGetStat(Oid groupId, ResGroupStatType type, char *retStr, int retStrLen, const char *prop)
{
ResGroup group;
......@@ -423,7 +432,7 @@ ResGroupGetStat(Oid groupId, ResGroupStatType type, char *retStr, int retStrLen)
default:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Invalid stat type %d", type)));
errmsg("Invalid stat type %s", prop)));
}
LWLockRelease(ResGroupLock);
......@@ -642,6 +651,13 @@ ResGroupSlotRelease(void)
CurrentResGroupId = InvalidOid;
}
void
AssignResGroup(void)
{
Oid groupId = GetResGroupIdForRole(GetUserId());
ResGroupOps_AssignGroup(groupId, MyProcPid);
}
/*
* Wait on the queue of resource group
*/
......
......@@ -19,3 +19,17 @@
*/
bool ResourceScheduler = false; /* Is scheduling enabled? */
ResourceManagerPolicy Gp_resource_manager_policy;
bool
IsResQueueEnabled(void)
{
return ResourceScheduler &&
Gp_resource_manager_policy == RESOURCE_MANAGER_POLICY_QUEUE;
}
bool
IsResGroupEnabled(void)
{
return ResourceScheduler &&
Gp_resource_manager_policy == RESOURCE_MANAGER_POLICY_GROUP;
}
......@@ -23,6 +23,7 @@ extern void AlterResourceGroup(AlterResourceGroupStmt *stmt);
extern Oid GetResGroupIdForName(char *name, LOCKMODE lockmode);
extern char *GetResGroupNameForId(Oid oid, LOCKMODE lockmode);
extern void GetConcurrencyForResGroup(int groupId, int *value, int *proposed);
extern float GetCpuRateLimitForResGroup(int groupId);
extern Oid GetResGroupIdForRole(Oid roleid);
#endif /* RESGROUPCMDS_H */
......@@ -92,6 +92,9 @@ extern void ProcessInterrupts(const char* filename, int lineno);
extern void BackoffBackendTick(void);
extern bool gp_enable_resqueue_priority;
/* in utils/resource_manager.h */
extern bool IsResQueueEnabled(void);
/*
* We don't want to include the entire vmem_tracker.h, and so,
* declare the only function we use from vmem_tracker.h.
......@@ -124,7 +127,7 @@ do { \
}\
if (InterruptPending) \
ProcessInterrupts(__FILE__, __LINE__); \
if (gp_enable_resqueue_priority) \
if (IsResQueueEnabled() && gp_enable_resqueue_priority) \
BackoffBackendTick(); \
ReportOOMConsumption(); \
RedZoneHandler_DetectRunawaySession();\
......@@ -134,7 +137,7 @@ do { \
do { \
if (InterruptPending) \
ProcessInterrupts(__FILE__, __LINE__); \
if (gp_enable_resqueue_priority) \
if (IsResQueueEnabled() && gp_enable_resqueue_priority) \
BackoffBackendTick(); \
ReportOOMConsumption(); \
RedZoneHandler_DetectRunawaySession();\
......
......@@ -19,6 +19,7 @@ extern int gp_resqueue_priority_inactivity_timeout;
extern int gp_resqueue_priority_grouping_timeout;
extern double gp_resqueue_priority_cpucores_per_segment;
extern char* gp_resqueue_priority_default_value;
extern double gp_resource_group_cpu_limit;
extern void BackoffBackendEntryInit(int sessionid, int commandcount, int weight);
extern void BackoffBackendEntryExit(void);
......
/*-------------------------------------------------------------------------
*
* resgroup.h
* GPDB resource group definitions.
*
*
* Copyright (c) 2017, Pivotal Software Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef RES_GROUP_OPS_H
#define RES_GROUP_OPS_H
/*
* Interfaces for OS dependent operations
*/
extern const char * ResGroupOps_Name(void);
extern void ResGroupOps_Bless(void);
extern void ResGroupOps_Init(void);
extern void ResGroupOps_AdjustGUCs(void);
extern void ResGroupOps_CreateGroup(Oid group);
extern void ResGroupOps_DestroyGroup(Oid group);
extern void ResGroupOps_AssignGroup(Oid group, int pid);
extern void ResGroupOps_SetCpuRateLimit(Oid group, float cpu_rate_limit);
extern int64 ResGroupOps_GetCpuUsage(Oid group);
extern int ResGroupOps_GetCpuCores(void);
#endif /* RES_GROUP_OPS_H */
......@@ -48,11 +48,15 @@ typedef struct ResGroupControl
/* Type of statistic infomation */
typedef enum
{
RES_GROUP_STAT_UNKNOWN = -1,
RES_GROUP_STAT_NRUNNING = 0,
RES_GROUP_STAT_NQUEUEING,
RES_GROUP_STAT_TOTAL_EXECUTED,
RES_GROUP_STAT_TOTAL_QUEUED,
RES_GROUP_STAT_TOTAL_QUEUE_TIME,
RES_GROUP_STAT_CPU_USAGE,
RES_GROUP_STAT_MEM_USAGE,
} ResGroupStatType;
/*
......@@ -72,8 +76,11 @@ extern void FreeResGroupEntry(Oid groupId);
extern void ResGroupSlotAcquire(void);
extern void ResGroupSlotRelease(void);
/* Assign current process to the associated resource group */
extern void AssignResGroup(void);
/* Retrieve statistic information of type from resource group */
extern void ResGroupGetStat(Oid groupId, ResGroupStatType type, char *retStr, int retStrLen);
extern void ResGroupGetStat(Oid groupId, ResGroupStatType type, char *retStr, int retStrLen, const char *prop);
extern void ResGroupAlterCheckForWakeup(Oid groupId);
extern void ResGroupDropCheckForWakeup(Oid groupId, bool isCommit);
......
......@@ -26,10 +26,7 @@ typedef enum
extern bool ResourceScheduler;
extern ResourceManagerPolicy Gp_resource_manager_policy;
#define IsResQueueEnabled() \
(bool)(ResourceScheduler && Gp_resource_manager_policy == RESOURCE_MANAGER_POLICY_QUEUE)
#define IsResGroupEnabled() \
(bool)(ResourceScheduler && Gp_resource_manager_policy == RESOURCE_MANAGER_POLICY_GROUP)
extern bool IsResQueueEnabled(void);
extern bool IsResGroupEnabled(void);
#endif /* RESOURCEMANAGER_H */
......@@ -34,8 +34,8 @@ atmsort.pm:
explain.pm:
rm -f $@ && $(LN_S) $(top_builddir)/src/test/regress/explain.pm
pg_isolation2_regress$(X): isolation2_main.o pg_regress.o
$(CC) $(CFLAGS) $^ $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@
pg_isolation2_regress$(X): isolation2_main.o pg_regress.o submake-libpq submake-libpgport
$(CC) $(CFLAGS) $(filter %.o,$^) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@
clean distclean:
rm -f pg_isolation2_regress$(X) $(OBJS) isolation2_main.o
......@@ -45,3 +45,6 @@ clean distclean:
installcheck: all gpdiff.pl gpstringsubs.pl
./pg_isolation2_regress --init-file=$(top_builddir)/src/test/regress/init_file --psqldir='$(PSQLDIR)' --inputdir=$(srcdir) --ao-dir=uao --schedule=$(srcdir)/isolation2_schedule
installcheck-resgroup: all gpdiff.pl gpstringsubs.pl
./pg_isolation2_regress --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_resgroup --psqldir='$(PSQLDIR)' --inputdir=$(srcdir) --ao-dir=uao --dbname=isolation2resgrouptest --schedule=$(srcdir)/isolation2_resgroup_schedule
-- reset the GUC and restart cluster.
-- start_ignore
! gpconfig -r gp_resource_manager;
! gpstop -rai;
-- end_ignore
SHOW gp_resource_manager;
gp_resource_manager
-------------------
queue
(1 row)
1: SET ROLE TO r1;
SET
0: ALTER ROLE r1 RESOURCE GROUP none;
ALTER
0: DROP RESOURCE GROUP g1;
DROP
2: SET ROLE TO r2;
SET
0: ALTER ROLE r2 RESOURCE GROUP none;
ALTER
0: DROP RESOURCE GROUP g2;
DROP
! ls -d /sys/fs/cgroup/cpu/gpdb/*/;
/sys/fs/cgroup/cpu/gpdb/6437/
/sys/fs/cgroup/cpu/gpdb/6438/
! ls -d /sys/fs/cgroup/cpuacct/gpdb/*/;
/sys/fs/cgroup/cpuacct/gpdb/6437/
/sys/fs/cgroup/cpuacct/gpdb/6438/
-- start_ignore
! rmdir /sys/fs/cgroup/cpu/gpdb;
! rmdir /sys/fs/cgroup/cpuacct/gpdb;
! mkdir /sys/fs/cgroup/cpu/gpdb;
! mkdir /sys/fs/cgroup/cpuacct/gpdb;
! chmod 755 /sys/fs/cgroup/cpu/gpdb;
! chmod 755 /sys/fs/cgroup/cpuacct/gpdb;
-- end_ignore
-- enable resource group and restart cluster.
-- start_ignore
! gpconfig -c gp_resource_manager -v group;
20170502:01:28:12:000367 gpconfig:sdw6:gpadmin-[WARNING]:-Managing queries with resource groups is an experimental feature. A work-in-progress version is enabled.
20170502:01:28:13:000367 gpconfig:sdw6:gpadmin-[INFO]:-completed successfully
! gpstop -rai;
-- end_ignore
SHOW gp_resource_manager;
gp_resource_manager
-------------------
group
(1 row)
-- start_ignore
! rmdir /sys/fs/cgroup/cpu/gpdb/*/;
! rmdir /sys/fs/cgroup/cpuacct/gpdb/*/;
! rmdir /sys/fs/cgroup/cpu/gpdb;
! rmdir /sys/fs/cgroup/cpuacct/gpdb;
-- end_ignore
-- gpdb top group is not created
! gpconfig -c gp_resource_manager -v group;
20170517:11:54:17:011348 gpconfig:nyu-vm-centos:gpadmin-[WARNING]:-Managing queries with resource groups is an experimental feature. A work-in-progress version is enabled.
20170517:11:54:18:011348 gpconfig:nyu-vm-centos:gpadmin-[CRITICAL]:-new GUC value failed validation: [nyu-vm-centos:cgroup is not properly configured: directory '/sys/fs/cgroup/cpu/gpdb/' does not exist
]
new GUC value failed validation: [nyu-vm-centos:cgroup is not properly configured: directory '/sys/fs/cgroup/cpu/gpdb/' does not exist
]
-- start_ignore
! mkdir /sys/fs/cgroup/cpu/gpdb;
! mkdir /sys/fs/cgroup/cpuacct/gpdb;
! chmod 644 /sys/fs/cgroup/cpu/gpdb;
-- end_ignore
-- gpdb directory should have rwx permission
! gpconfig -c gp_resource_manager -v group;
20170517:11:54:18:011409 gpconfig:nyu-vm-centos:gpadmin-[WARNING]:-Managing queries with resource groups is an experimental feature. A work-in-progress version is enabled.
20170517:11:54:18:011409 gpconfig:nyu-vm-centos:gpadmin-[CRITICAL]:-new GUC value failed validation: [nyu-vm-centos:cgroup is not properly configured: directory '/sys/fs/cgroup/cpu/gpdb/' permission denied: require permission 'rwx'
]
new GUC value failed validation: [nyu-vm-centos:cgroup is not properly configured: directory '/sys/fs/cgroup/cpu/gpdb/' permission denied: require permission 'rwx'
]
-- start_ignore
! chmod 755 /sys/fs/cgroup/cpu/gpdb;
! chmod 444 /sys/fs/cgroup/cpu/gpdb/cgroup.procs;
! chmod 444 /sys/fs/cgroup/cpu/gpdb/cpu.cfs_quota_us;
! chmod 244 /sys/fs/cgroup/cpu/gpdb/cpu.cfs_period_us;
! chmod 244 /sys/fs/cgroup/cpuacct/gpdb/cpuacct.usage;
-- end_ignore
-- cgroup.procs should have writepermission
-- cpu.cfs_quota_us should have write permission
-- cpu.cfs_period_us should have read permission
-- cpuacct.usage should have read permission
! gpconfig -c gp_resource_manager -v group;
20170517:11:54:18:011466 gpconfig:nyu-vm-centos:gpadmin-[WARNING]:-Managing queries with resource groups is an experimental feature. A work-in-progress version is enabled.
20170517:11:54:18:011466 gpconfig:nyu-vm-centos:gpadmin-[CRITICAL]:-new GUC value failed validation: [nyu-vm-centos:cgroup is not properly configured: file '/sys/fs/cgroup/cpu/gpdb/cgroup.procs' permission denied: require permission 'rw'
]
new GUC value failed validation: [nyu-vm-centos:cgroup is not properly configured: file '/sys/fs/cgroup/cpu/gpdb/cgroup.procs' permission denied: require permission 'rw'
]
......@@ -4,15 +4,9 @@ CREATE
0:SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test';
rsgname |num_running|num_queueing|num_queued|num_executed
-------------------+-----------+------------+----------+------------
rg_concurrency_test| | | |
rg_concurrency_test|0 |0 |0 |0
(1 row)
-- enable resource group and restart cluster.
-- start_ignore
! gpconfig -c gp_resource_manager -v group;
! gpstop -rai;
-- end_ignore
-- test1: test gp_toolkit.gp_resgroup_status and pg_stat_activity
-- no query has been assigned to the this group
1:SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test';
......@@ -295,9 +289,3 @@ rsgname|num_running|num_queueing|num_queued|num_executed
-------+-----------+------------+----------+------------
(0 rows)
-- reset the GUC and restart cluster.
-- start_ignore
! gpconfig -r gp_resource_manager;
! gpstop -rai;
-- end_ignore
-- start_ignore
DROP VIEW IF EXISTS cpu_status;
DROP
DROP VIEW IF EXISTS busy;
DROP
DROP VIEW IF EXISTS cancel_all;
DROP
DROP FUNCTION IF EXISTS round_percentage(text);
DROP
DROP TABLE IF EXISTS bigtable;
DROP
DROP ROLE IF EXISTS r1;
DROP
DROP ROLE IF EXISTS r2;
DROP
DROP RESOURCE GROUP g1;
DROP
DROP RESOURCE GROUP g2;
DROP
-- end_ignore
--
-- helper functions, tables and views
--
CREATE TABLE bigtable AS SELECT i AS c1, 'abc' AS c2 FROM generate_series(1,100000) i;
CREATE 100000
-- the cpu usage limitation has an error rate about +-7.5%,
-- and also we want to satisfy the 0.1:0.2 rate under 90% overall limitation
-- so we round the cpu rate by 15%
CREATE FUNCTION round_percentage(text) RETURNS text AS $$ SELECT (round(rtrim($1, '%') :: double precision / 15) * 15) :: text || '%' $$ LANGUAGE sql;
CREATE
CREATE VIEW cpu_status AS SELECT g.rsgname, round_percentage(s.cpu_usage) FROM gp_toolkit.gp_resgroup_status s, pg_resgroup g WHERE s.groupid=g.oid ORDER BY g.oid;
CREATE
CREATE VIEW busy AS SELECT count(*) FROM bigtable t1, bigtable t2, bigtable t3, bigtable t4, bigtable t5 WHERE 0 = (t1.c1 % 2 + 10000)! AND 0 = (t2.c1 % 2 + 10000)! AND 0 = (t3.c1 % 2 + 10000)! AND 0 = (t4.c1 % 2 + 10000)! AND 0 = (t5.c1 % 2 + 10000)! ;
CREATE
CREATE VIEW cancel_all AS SELECT pg_cancel_backend(procpid) FROM pg_stat_activity WHERE current_query LIKE 'SELECT * FROM busy%';
CREATE
--
-- check gpdb cgroup configuration
--
-- cfs_quota_us := cfs_period_us * ncores * gp_resource_group_cpu_limit
-- shares := 1024 * ncores
--
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/cpu.cfs_quota_us) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.cfs_period_us) * $(nproc) * $(psql -d isolation2resgrouptest -Aqtc "SHOW gp_resource_group_cpu_limit"))";
True
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) == 1024 * 256";
True
--
-- check default groups configuration
--
-- SUB/shares := TOP/shares * cpu_rate_limit
--
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/$(psql -d isolation2resgrouptest -Aqtc "SELECT oid FROM pg_resgroup WHERE rsgname='default_group'")/cpu.shares) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) * $(psql -d isolation2resgrouptest -Aqtc "SELECT value FROM pg_resgroupcapability c, pg_resgroup g WHERE c.resgroupid=g.oid AND reslimittype=2 AND g.rsgname='default_group'"))";
True
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/$(psql -d isolation2resgrouptest -Aqtc "SELECT oid FROM pg_resgroup WHERE rsgname='admin_group'")/cpu.shares) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) * $(psql -d isolation2resgrouptest -Aqtc "SELECT value FROM pg_resgroupcapability c, pg_resgroup g WHERE c.resgroupid=g.oid AND reslimittype=2 AND g.rsgname='admin_group'"))";
True
-- create two resource groups
CREATE RESOURCE GROUP g1 WITH (cpu_rate_limit=0.1, memory_limit=0.1);
CREATE
CREATE RESOURCE GROUP g2 WITH (cpu_rate_limit=0.2, memory_limit=0.2);
CREATE
-- check g1 configuration
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/$(psql -d isolation2resgrouptest -Aqtc "SELECT oid FROM pg_resgroup WHERE rsgname='g1'")/cpu.shares) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) * 0.1)";
True
-- check g2 configuration
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/$(psql -d isolation2resgrouptest -Aqtc "SELECT oid FROM pg_resgroup WHERE rsgname='g2'")/cpu.shares) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) * 0.2)";
True
-- create two roles and assign them to above groups
CREATE ROLE r1 RESOURCE GROUP g1;
CREATE
CREATE ROLE r2 RESOURCE GROUP g2;
CREATE
GRANT ALL ON busy TO r1;
GRANT
GRANT ALL ON busy TO r2;
GRANT
-- prepare parallel queries in the two groups
10: SET ROLE TO r1;
SET
11: SET ROLE TO r1;
SET
12: SET ROLE TO r1;
SET
13: SET ROLE TO r1;
SET
14: SET ROLE TO r1;
SET
20: SET ROLE TO r2;
SET
21: SET ROLE TO r2;
SET
22: SET ROLE TO r2;
SET
23: SET ROLE TO r2;
SET
24: SET ROLE TO r2;
SET
--
-- now we get prepared.
--
-- on empty load the cpu usage shall be 0%
--
SELECT * FROM cpu_status;
rsgname |round_percentage
-------------+----------------
default_group|0%
admin_group |0%
g1 |0%
g2 |0%
(4 rows)
--
-- a group should burst to use all the cpu usage
-- when it's the only one with running queries.
--
-- however the overall cpu usage is controlled by a GUC
-- gp_resource_group_cpu_limit which is 90% by default.
--
-- so the cpu usage shall be 90%
--
10&: SELECT * FROM busy; <waiting ...>
11&: SELECT * FROM busy; <waiting ...>
12&: SELECT * FROM busy; <waiting ...>
13&: SELECT * FROM busy; <waiting ...>
14&: SELECT * FROM busy; <waiting ...>
SELECT pg_sleep(20);
pg_sleep
--------
(1 row)
SELECT * FROM cpu_status;
rsgname |round_percentage
-------------+----------------
default_group|0%
admin_group |0%
g1 |90%
g2 |0%
(4 rows)
-- start_ignore
SELECT * FROM cancel_all;
pg_cancel_backend
-----------------
t
t
t
t
t
(5 rows)
10<: <... completed>
ERROR: canceling statement due to user request
11<: <... completed>
ERROR: canceling statement due to user request
12<: <... completed>
ERROR: canceling statement due to user request
13<: <... completed>
ERROR: canceling statement due to user request
14<: <... completed>
ERROR: canceling statement due to user request
-- end_ignore
--
-- when there are multiple groups with parallel queries,
-- they should share the cpu usage by their cpu_usage settings,
--
-- g1:g2 is 0.1:0.2 => 1:2, so:
--
-- - g1 gets 90% * 1/3 => 30%;
-- - g2 gets 90% * 2/3 => 60%;
--
10&: SELECT * FROM busy; <waiting ...>
11&: SELECT * FROM busy; <waiting ...>
12&: SELECT * FROM busy; <waiting ...>
13&: SELECT * FROM busy; <waiting ...>
14&: SELECT * FROM busy; <waiting ...>
20&: SELECT * FROM busy; <waiting ...>
21&: SELECT * FROM busy; <waiting ...>
22&: SELECT * FROM busy; <waiting ...>
23&: SELECT * FROM busy; <waiting ...>
24&: SELECT * FROM busy; <waiting ...>
SELECT pg_sleep(20);
pg_sleep
--------
(1 row)
SELECT * FROM cpu_status;
rsgname |round_percentage
-------------+----------------
default_group|0%
admin_group |0%
g1 |30%
g2 |60%
(4 rows)
-- start_ignore
SELECT * FROM cancel_all;
pg_cancel_backend
-----------------
t
t
t
t
t
t
t
t
t
t
(10 rows)
10<: <... completed>
ERROR: canceling statement due to user request
11<: <... completed>
ERROR: canceling statement due to user request
12<: <... completed>
ERROR: canceling statement due to user request
13<: <... completed>
ERROR: canceling statement due to user request
14<: <... completed>
ERROR: canceling statement due to user request
20<: <... completed>
ERROR: canceling statement due to user request
21<: <... completed>
ERROR: canceling statement due to user request
22<: <... completed>
ERROR: canceling statement due to user request
23<: <... completed>
ERROR: canceling statement due to user request
24<: <... completed>
ERROR: canceling statement due to user request
-- end_ignore
-- start_matchsubs
m/^[0-9:]+\sgpconfig:[^:]+:/
s/^[0-9:]+\sgpconfig:[^:]+:/TIMESTAMP gpconfig:SEGMENT:/
m/\[\S+:cgroup is not properly configured:/
s/\[\S+:cgroup is not properly configured:/\[SEGMENT:cgroup is not properly configured:/
m/^ERROR: Resource group [0-9]+ was concurrently dropped$/
s/group [0-9]+ was/group OID was/
-- end_matchsubs
test: resgroup/enable_resgroup_validate
test: resgroup/enable_resgroup
test: resgroup/resgroup_cpu_rate_limit
test: resgroup/resgroup_concurrency
test: resgroup/drop_resgroup
test: resgroup/disable_resgroup
-- reset the GUC and restart cluster.
-- start_ignore
! gpconfig -r gp_resource_manager;
! gpstop -rai;
-- end_ignore
SHOW gp_resource_manager;
1: SET ROLE TO r1;
0: ALTER ROLE r1 RESOURCE GROUP none;
0: DROP RESOURCE GROUP g1;
2: SET ROLE TO r2;
0: ALTER ROLE r2 RESOURCE GROUP none;
0: DROP RESOURCE GROUP g2;
! ls -d /sys/fs/cgroup/cpu/gpdb/*/;
! ls -d /sys/fs/cgroup/cpuacct/gpdb/*/;
-- start_ignore
! rmdir /sys/fs/cgroup/cpu/gpdb;
! rmdir /sys/fs/cgroup/cpuacct/gpdb;
! mkdir /sys/fs/cgroup/cpu/gpdb;
! mkdir /sys/fs/cgroup/cpuacct/gpdb;
-- end_ignore
-- enable resource group and restart cluster.
-- start_ignore
! gpconfig -c gp_resource_manager -v group;
! gpstop -rai;
-- end_ignore
SHOW gp_resource_manager;
-- start_ignore
! rmdir /sys/fs/cgroup/cpu/gpdb/*/;
! rmdir /sys/fs/cgroup/cpuacct/gpdb/*/;
! rmdir /sys/fs/cgroup/cpu/gpdb;
! rmdir /sys/fs/cgroup/cpuacct/gpdb;
-- end_ignore
-- gpdb top group is not created
! gpconfig -c gp_resource_manager -v group;
-- start_ignore
! mkdir /sys/fs/cgroup/cpu/gpdb;
! mkdir /sys/fs/cgroup/cpuacct/gpdb;
! chmod 644 /sys/fs/cgroup/cpu/gpdb;
-- end_ignore
-- gpdb directory should have rwx permission
! gpconfig -c gp_resource_manager -v group;
-- start_ignore
! chmod 755 /sys/fs/cgroup/cpu/gpdb;
! chmod 444 /sys/fs/cgroup/cpu/gpdb/cgroup.procs;
! chmod 444 /sys/fs/cgroup/cpu/gpdb/cpu.cfs_quota_us;
! chmod 244 /sys/fs/cgroup/cpu/gpdb/cpu.cfs_period_us;
! chmod 244 /sys/fs/cgroup/cpuacct/gpdb/cpuacct.usage;
-- end_ignore
-- cgroup.procs should have writepermission
-- cpu.cfs_quota_us should have write permission
-- cpu.cfs_period_us should have read permission
-- cpuacct.usage should have read permission
! gpconfig -c gp_resource_manager -v group;
......@@ -2,12 +2,6 @@
0:CREATE RESOURCE GROUP rg_concurrency_test WITH (concurrency=2, cpu_rate_limit=.02, memory_limit=.02);
0:SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test';
-- enable resource group and restart cluster.
-- start_ignore
! gpconfig -c gp_resource_manager -v group;
! gpstop -rai;
-- end_ignore
-- test1: test gp_toolkit.gp_resgroup_status and pg_stat_activity
-- no query has been assigned to the this group
1:SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test';
......@@ -66,7 +60,7 @@
25&:BEGIN;
21:SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test';
21:SELECT concurrency,proposed_concurrency FROM gp_toolkit.gp_resgroup_config WHERE groupname='rg_concurrency_test';
-- Alter concurrency 3->2, the 'proposed' of pg_resgroupcapability will be set to 2.
-- Alter concurrency 3->2, the 'proposed' of pg_resgroupcapability will be set to 2.
21:ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2;
21:SELECT concurrency,proposed_concurrency FROM gp_toolkit.gp_resgroup_config WHERE groupname='rg_concurrency_test';
-- When one transaction is finished, queueing transaction won't be woken up. There're 2 running transactions and 1 queueing transaction.
......@@ -78,7 +72,7 @@
-- Finish another transaction, one queueing transaction will be woken up, there're 2 running transactions and 1 queueing transaction.
22:END;
21:SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test';
-- Alter concurrency 2->2, the 'value' and 'proposed' of pg_resgroupcapability will be set to 2.
-- Alter concurrency 2->2, the 'value' and 'proposed' of pg_resgroupcapability will be set to 2.
21:ALTER RESOURCE GROUP rg_concurrency_test SET CONCURRENCY 2;
21:SELECT concurrency,proposed_concurrency FROM gp_toolkit.gp_resgroup_config WHERE groupname='rg_concurrency_test';
-- Finish another transaction, one queueing transaction will be woken up, there're 2 running transactions and 0 queueing transaction.
......@@ -120,7 +114,7 @@
32:END;
32:RESET ROLE;
-- DROP is committed
-- DROP is committed
31:BEGIN;
31:DROP ROLE role_concurrency_test;
31:DROP RESOURCE GROUP rg_concurrency_test;
......@@ -132,9 +126,3 @@
32<:
33:SELECT r.rsgname, num_running, num_queueing, num_queued, num_executed FROM gp_toolkit.gp_resgroup_status s, pg_resgroup r WHERE s.groupid=r.oid AND r.rsgname='rg_concurrency_test';
-- reset the GUC and restart cluster.
-- start_ignore
! gpconfig -r gp_resource_manager;
! gpstop -rai;
-- end_ignore
-- start_ignore
DROP VIEW IF EXISTS cpu_status;
DROP VIEW IF EXISTS busy;
DROP VIEW IF EXISTS cancel_all;
DROP FUNCTION IF EXISTS round_percentage(text);
DROP TABLE IF EXISTS bigtable;
DROP ROLE IF EXISTS r1;
DROP ROLE IF EXISTS r2;
DROP RESOURCE GROUP g1;
DROP RESOURCE GROUP g2;
-- end_ignore
--
-- helper functions, tables and views
--
CREATE TABLE bigtable AS
SELECT i AS c1, 'abc' AS c2
FROM generate_series(1,100000) i;
-- the cpu usage limitation has an error rate about +-7.5%,
-- and also we want to satisfy the 0.1:0.2 rate under 90% overall limitation
-- so we round the cpu rate by 15%
CREATE FUNCTION round_percentage(text) RETURNS text AS $$
SELECT (round(rtrim($1, '%') :: double precision / 15) * 15) :: text || '%'
$$ LANGUAGE sql;
CREATE VIEW cpu_status AS
SELECT g.rsgname, round_percentage(s.cpu_usage)
FROM gp_toolkit.gp_resgroup_status s, pg_resgroup g
WHERE s.groupid=g.oid
ORDER BY g.oid;
CREATE VIEW busy AS
SELECT count(*)
FROM
bigtable t1,
bigtable t2,
bigtable t3,
bigtable t4,
bigtable t5
WHERE 0 = (t1.c1 % 2 + 10000)!
AND 0 = (t2.c1 % 2 + 10000)!
AND 0 = (t3.c1 % 2 + 10000)!
AND 0 = (t4.c1 % 2 + 10000)!
AND 0 = (t5.c1 % 2 + 10000)!
;
CREATE VIEW cancel_all AS
SELECT pg_cancel_backend(procpid)
FROM pg_stat_activity
WHERE current_query LIKE 'SELECT * FROM busy%';
--
-- check gpdb cgroup configuration
--
-- cfs_quota_us := cfs_period_us * ncores * gp_resource_group_cpu_limit
-- shares := 1024 * ncores
--
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/cpu.cfs_quota_us) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.cfs_period_us) * $(nproc) * $(psql -d isolation2resgrouptest -Aqtc "SHOW gp_resource_group_cpu_limit"))";
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) == 1024 * 256";
--
-- check default groups configuration
--
-- SUB/shares := TOP/shares * cpu_rate_limit
--
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/$(psql -d isolation2resgrouptest -Aqtc "SELECT oid FROM pg_resgroup WHERE rsgname='default_group'")/cpu.shares) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) * $(psql -d isolation2resgrouptest -Aqtc "SELECT value FROM pg_resgroupcapability c, pg_resgroup g WHERE c.resgroupid=g.oid AND reslimittype=2 AND g.rsgname='default_group'"))";
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/$(psql -d isolation2resgrouptest -Aqtc "SELECT oid FROM pg_resgroup WHERE rsgname='admin_group'")/cpu.shares) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) * $(psql -d isolation2resgrouptest -Aqtc "SELECT value FROM pg_resgroupcapability c, pg_resgroup g WHERE c.resgroupid=g.oid AND reslimittype=2 AND g.rsgname='admin_group'"))";
-- create two resource groups
CREATE RESOURCE GROUP g1 WITH (cpu_rate_limit=0.1, memory_limit=0.1);
CREATE RESOURCE GROUP g2 WITH (cpu_rate_limit=0.2, memory_limit=0.2);
-- check g1 configuration
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/$(psql -d isolation2resgrouptest -Aqtc "SELECT oid FROM pg_resgroup WHERE rsgname='g1'")/cpu.shares) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) * 0.1)";
-- check g2 configuration
! python -c "print $(cat /sys/fs/cgroup/cpu/gpdb/$(psql -d isolation2resgrouptest -Aqtc "SELECT oid FROM pg_resgroup WHERE rsgname='g2'")/cpu.shares) == int($(cat /sys/fs/cgroup/cpu/gpdb/cpu.shares) * 0.2)";
-- create two roles and assign them to above groups
CREATE ROLE r1 RESOURCE GROUP g1;
CREATE ROLE r2 RESOURCE GROUP g2;
GRANT ALL ON busy TO r1;
GRANT ALL ON busy TO r2;
-- prepare parallel queries in the two groups
10: SET ROLE TO r1;
11: SET ROLE TO r1;
12: SET ROLE TO r1;
13: SET ROLE TO r1;
14: SET ROLE TO r1;
20: SET ROLE TO r2;
21: SET ROLE TO r2;
22: SET ROLE TO r2;
23: SET ROLE TO r2;
24: SET ROLE TO r2;
--
-- now we get prepared.
--
-- on empty load the cpu usage shall be 0%
--
SELECT * FROM cpu_status;
--
-- a group should burst to use all the cpu usage
-- when it's the only one with running queries.
--
-- however the overall cpu usage is controlled by a GUC
-- gp_resource_group_cpu_limit which is 90% by default.
--
-- so the cpu usage shall be 90%
--
10&: SELECT * FROM busy;
11&: SELECT * FROM busy;
12&: SELECT * FROM busy;
13&: SELECT * FROM busy;
14&: SELECT * FROM busy;
SELECT pg_sleep(20);
SELECT * FROM cpu_status;
-- start_ignore
SELECT * FROM cancel_all;
10<:
11<:
12<:
13<:
14<:
-- end_ignore
--
-- when there are multiple groups with parallel queries,
-- they should share the cpu usage by their cpu_usage settings,
--
-- g1:g2 is 0.1:0.2 => 1:2, so:
--
-- - g1 gets 90% * 1/3 => 30%;
-- - g2 gets 90% * 2/3 => 60%;
--
10&: SELECT * FROM busy;
11&: SELECT * FROM busy;
12&: SELECT * FROM busy;
13&: SELECT * FROM busy;
14&: SELECT * FROM busy;
20&: SELECT * FROM busy;
21&: SELECT * FROM busy;
22&: SELECT * FROM busy;
23&: SELECT * FROM busy;
24&: SELECT * FROM busy;
SELECT pg_sleep(20);
SELECT * FROM cpu_status;
-- start_ignore
SELECT * FROM cancel_all;
10<:
11<:
12<:
13<:
14<:
20<:
21<:
22<:
23<:
24<:
-- end_ignore
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册