提交 2baba250 编写于 作者: Y Yehuda Sadeh 提交者: Sage Weil

ceph: writeback congestion control

Set bdi congestion bit when amount of write data in flight exceeds adjustable
threshold.
Signed-off-by: NYehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: NSage Weil <sage@newdream.net>
上级 dbd646a8
...@@ -47,6 +47,12 @@ ...@@ -47,6 +47,12 @@
* accounting is preserved. * accounting is preserved.
*/ */
#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
#define CONGESTION_OFF_THRESH(congestion_kb) \
(CONGESTION_ON_THRESH(congestion_kb) - \
(CONGESTION_ON_THRESH(congestion_kb) >> 2))
/* /*
* Dirty a page. Optimistically adjust accounting, on the assumption * Dirty a page. Optimistically adjust accounting, on the assumption
...@@ -377,6 +383,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -377,6 +383,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
{ {
struct inode *inode; struct inode *inode;
struct ceph_inode_info *ci; struct ceph_inode_info *ci;
struct ceph_client *client;
struct ceph_osd_client *osdc; struct ceph_osd_client *osdc;
loff_t page_off = page->index << PAGE_CACHE_SHIFT; loff_t page_off = page->index << PAGE_CACHE_SHIFT;
int len = PAGE_CACHE_SIZE; int len = PAGE_CACHE_SIZE;
...@@ -384,6 +391,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -384,6 +391,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
int err = 0; int err = 0;
struct ceph_snap_context *snapc; struct ceph_snap_context *snapc;
u64 snap_size = 0; u64 snap_size = 0;
long writeback_stat;
dout("writepage %p idx %lu\n", page, page->index); dout("writepage %p idx %lu\n", page, page->index);
...@@ -393,7 +401,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -393,7 +401,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
} }
inode = page->mapping->host; inode = page->mapping->host;
ci = ceph_inode(inode); ci = ceph_inode(inode);
osdc = &ceph_inode_to_client(inode)->osdc; client = ceph_inode_to_client(inode);
osdc = &client->osdc;
/* verify this is a writeable snap context */ /* verify this is a writeable snap context */
snapc = (void *)page->private; snapc = (void *)page->private;
...@@ -420,6 +429,11 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -420,6 +429,11 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage %p page %p index %lu on %llu~%u\n", dout("writepage %p page %p index %lu on %llu~%u\n",
inode, page, page->index, page_off, len); inode, page, page->index, page_off, len);
writeback_stat = atomic_long_inc_return(&client->writeback_count);
if (writeback_stat >
CONGESTION_ON_THRESH(client->mount_args->congestion_kb))
set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
set_page_writeback(page); set_page_writeback(page);
err = ceph_osdc_writepages(osdc, ceph_vino(inode), err = ceph_osdc_writepages(osdc, ceph_vino(inode),
&ci->i_layout, snapc, &ci->i_layout, snapc,
...@@ -499,6 +513,8 @@ static void writepages_finish(struct ceph_osd_request *req, ...@@ -499,6 +513,8 @@ static void writepages_finish(struct ceph_osd_request *req,
struct writeback_control *wbc = req->r_wbc; struct writeback_control *wbc = req->r_wbc;
__s32 rc = -EIO; __s32 rc = -EIO;
u64 bytes = 0; u64 bytes = 0;
struct ceph_client *client = ceph_inode_to_client(inode);
long writeback_stat;
/* parse reply */ /* parse reply */
replyhead = msg->front.iov_base; replyhead = msg->front.iov_base;
...@@ -524,6 +540,13 @@ static void writepages_finish(struct ceph_osd_request *req, ...@@ -524,6 +540,13 @@ static void writepages_finish(struct ceph_osd_request *req,
BUG_ON(!page); BUG_ON(!page);
WARN_ON(!PageUptodate(page)); WARN_ON(!PageUptodate(page));
writeback_stat =
atomic_long_dec_return(&client->writeback_count);
if (writeback_stat <
CONGESTION_OFF_THRESH(client->mount_args->congestion_kb))
clear_bdi_congested(&client->backing_dev_info,
BLK_RW_ASYNC);
if (i >= wrote) { if (i >= wrote) {
dout("inode %p skipping page %p\n", inode, page); dout("inode %p skipping page %p\n", inode, page);
wbc->pages_skipped++; wbc->pages_skipped++;
...@@ -666,6 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -666,6 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping,
u64 offset, len; u64 offset, len;
struct ceph_osd_request_head *reqhead; struct ceph_osd_request_head *reqhead;
struct ceph_osd_op *op; struct ceph_osd_op *op;
long writeback_stat;
next = 0; next = 0;
locked_pages = 0; locked_pages = 0;
...@@ -773,6 +797,12 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -773,6 +797,12 @@ static int ceph_writepages_start(struct address_space *mapping,
first = i; first = i;
dout("%p will write page %p idx %lu\n", dout("%p will write page %p idx %lu\n",
inode, page, page->index); inode, page, page->index);
writeback_stat = atomic_long_inc_return(&client->writeback_count);
if (writeback_stat > CONGESTION_ON_THRESH(client->mount_args->congestion_kb)) {
set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
}
set_page_writeback(page); set_page_writeback(page);
req->r_pages[locked_pages] = page; req->r_pages[locked_pages] = page;
locked_pages++; locked_pages++;
...@@ -998,7 +1028,8 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, ...@@ -998,7 +1028,8 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
struct page *page, void *fsdata) struct page *page, void *fsdata)
{ {
struct inode *inode = file->f_dentry->d_inode; struct inode *inode = file->f_dentry->d_inode;
struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc; struct ceph_client *client = ceph_inode_to_client(inode);
struct ceph_mds_client *mdsc = &client->mdsc;
unsigned from = pos & (PAGE_CACHE_SIZE - 1); unsigned from = pos & (PAGE_CACHE_SIZE - 1);
int check_cap = 0; int check_cap = 0;
......
...@@ -320,6 +320,30 @@ DEFINE_SHOW_FUNC(osdc_show) ...@@ -320,6 +320,30 @@ DEFINE_SHOW_FUNC(osdc_show)
DEFINE_SHOW_FUNC(dentry_lru_show) DEFINE_SHOW_FUNC(dentry_lru_show)
DEFINE_SHOW_FUNC(caps_show) DEFINE_SHOW_FUNC(caps_show)
static int congestion_kb_set(void *data, u64 val)
{
struct ceph_client *client = (struct ceph_client *)data;
if (client)
client->mount_args->congestion_kb = (int)val;
return 0;
}
static int congestion_kb_get(void *data, u64 *val)
{
struct ceph_client *client = (struct ceph_client *)data;
if (client)
*val = (u64)client->mount_args->congestion_kb;
return 0;
}
DEFINE_SIMPLE_ATTRIBUTE(congestion_kb_fops, congestion_kb_get,
congestion_kb_set, "%llu\n");
int __init ceph_debugfs_init(void) int __init ceph_debugfs_init(void)
{ {
ceph_debugfs_dir = debugfs_create_dir("ceph", NULL); ceph_debugfs_dir = debugfs_create_dir("ceph", NULL);
...@@ -409,6 +433,14 @@ int ceph_debugfs_client_init(struct ceph_client *client) ...@@ -409,6 +433,14 @@ int ceph_debugfs_client_init(struct ceph_client *client)
if (!client->debugfs_caps) if (!client->debugfs_caps)
goto out; goto out;
client->debugfs_congestion_kb = debugfs_create_file("writeback_congestion_kb",
0600,
client->debugfs_dir,
client,
&congestion_kb_fops);
if (!client->debugfs_congestion_kb)
goto out;
sprintf(name, "../../bdi/%s", dev_name(client->sb->s_bdi->dev)); sprintf(name, "../../bdi/%s", dev_name(client->sb->s_bdi->dev));
client->debugfs_bdi = debugfs_create_symlink("bdi", client->debugfs_dir, client->debugfs_bdi = debugfs_create_symlink("bdi", client->debugfs_dir,
name); name);
...@@ -431,6 +463,7 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client) ...@@ -431,6 +463,7 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
debugfs_remove(client->osdc.debugfs_file); debugfs_remove(client->osdc.debugfs_file);
debugfs_remove(client->mdsc.debugfs_file); debugfs_remove(client->mdsc.debugfs_file);
debugfs_remove(client->monc.debugfs_file); debugfs_remove(client->monc.debugfs_file);
debugfs_remove(client->debugfs_congestion_kb);
debugfs_remove(client->debugfs_dir); debugfs_remove(client->debugfs_dir);
} }
......
...@@ -150,6 +150,35 @@ static void ceph_inode_init_once(void *foo) ...@@ -150,6 +150,35 @@ static void ceph_inode_init_once(void *foo)
inode_init_once(&ci->vfs_inode); inode_init_once(&ci->vfs_inode);
} }
static int default_congestion_kb(void)
{
int congestion_kb;
/*
* Copied from NFS
*
* congestion size, scale with available memory.
*
* 64MB: 8192k
* 128MB: 11585k
* 256MB: 16384k
* 512MB: 23170k
* 1GB: 32768k
* 2GB: 46340k
* 4GB: 65536k
* 8GB: 92681k
* 16GB: 131072k
*
* This allows larger machines to have larger/more transfers.
* Limit the default to 256M
*/
congestion_kb = (16*int_sqrt(totalram_pages)) << (PAGE_SHIFT-10);
if (congestion_kb > 256*1024)
congestion_kb = 256*1024;
return congestion_kb;
}
static int __init init_caches(void) static int __init init_caches(void)
{ {
ceph_inode_cachep = kmem_cache_create("ceph_inode_info", ceph_inode_cachep = kmem_cache_create("ceph_inode_info",
...@@ -267,6 +296,7 @@ enum { ...@@ -267,6 +296,7 @@ enum {
Opt_caps_wanted_delay_min, Opt_caps_wanted_delay_min,
Opt_caps_wanted_delay_max, Opt_caps_wanted_delay_max,
Opt_readdir_max_entries, Opt_readdir_max_entries,
Opt_congestion_kb,
Opt_last_int, Opt_last_int,
/* int args above */ /* int args above */
Opt_snapdirname, Opt_snapdirname,
...@@ -295,6 +325,7 @@ static match_table_t arg_tokens = { ...@@ -295,6 +325,7 @@ static match_table_t arg_tokens = {
{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"}, {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
{Opt_readdir_max_entries, "readdir_max_entries=%d"}, {Opt_readdir_max_entries, "readdir_max_entries=%d"},
{Opt_congestion_kb, "write_congestion_kb=%d"},
/* int args above */ /* int args above */
{Opt_snapdirname, "snapdirname=%s"}, {Opt_snapdirname, "snapdirname=%s"},
{Opt_name, "name=%s"}, {Opt_name, "name=%s"},
...@@ -342,6 +373,7 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, ...@@ -342,6 +373,7 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
args->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); args->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
args->cap_release_safety = CEPH_CAPS_PER_RELEASE * 4; args->cap_release_safety = CEPH_CAPS_PER_RELEASE * 4;
args->max_readdir = 1024; args->max_readdir = 1024;
args->congestion_kb = default_congestion_kb();
/* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */ /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
err = -EINVAL; err = -EINVAL;
...@@ -445,6 +477,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, ...@@ -445,6 +477,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
case Opt_readdir_max_entries: case Opt_readdir_max_entries:
args->max_readdir = intval; args->max_readdir = intval;
break; break;
case Opt_congestion_kb:
args->congestion_kb = intval;
break;
case Opt_noshare: case Opt_noshare:
args->flags |= CEPH_OPT_NOSHARE; args->flags |= CEPH_OPT_NOSHARE;
...@@ -516,6 +551,7 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args) ...@@ -516,6 +551,7 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
client->msgr = NULL; client->msgr = NULL;
client->mount_err = 0; client->mount_err = 0;
atomic_long_set(&client->writeback_count, 0);
err = bdi_init(&client->backing_dev_info); err = bdi_init(&client->backing_dev_info);
if (err < 0) if (err < 0)
......
...@@ -59,6 +59,7 @@ struct ceph_mount_args { ...@@ -59,6 +59,7 @@ struct ceph_mount_args {
int wsize; int wsize;
int rsize; /* max readahead */ int rsize; /* max readahead */
int max_readdir; /* max readdir size */ int max_readdir; /* max readdir size */
int congestion_kb; /* max readdir size */
int osd_timeout; int osd_timeout;
char *snapdir_name; /* default ".snap" */ char *snapdir_name; /* default ".snap" */
char *name; char *name;
...@@ -136,6 +137,7 @@ struct ceph_client { ...@@ -136,6 +137,7 @@ struct ceph_client {
struct workqueue_struct *wb_wq; struct workqueue_struct *wb_wq;
struct workqueue_struct *pg_inv_wq; struct workqueue_struct *pg_inv_wq;
struct workqueue_struct *trunc_wq; struct workqueue_struct *trunc_wq;
atomic_long_t writeback_count;
struct backing_dev_info backing_dev_info; struct backing_dev_info backing_dev_info;
...@@ -143,6 +145,7 @@ struct ceph_client { ...@@ -143,6 +145,7 @@ struct ceph_client {
struct dentry *debugfs_monmap; struct dentry *debugfs_monmap;
struct dentry *debugfs_mdsmap, *debugfs_osdmap; struct dentry *debugfs_mdsmap, *debugfs_osdmap;
struct dentry *debugfs_dir, *debugfs_dentry_lru, *debugfs_caps; struct dentry *debugfs_dir, *debugfs_dentry_lru, *debugfs_caps;
struct dentry *debugfs_congestion_kb;
struct dentry *debugfs_bdi; struct dentry *debugfs_bdi;
#endif #endif
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册