提交 eca281aa 编写于 作者: L Linus Torvalds

Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2

* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2: (36 commits)
  Ocfs2: Move ocfs2 ioctl definitions from ocfs2_fs.h to newly added ocfs2_ioctl.h
  ocfs2: send SIGXFSZ if new filesize exceeds limit -v2
  ocfs2/userdlm: Add tracing in userdlm
  ocfs2: Use a separate masklog for AST and BASTs
  dlm: allow dlm do recovery during shutdown
  ocfs2: Only bug out in direct io write for reflinked extent.
  ocfs2: fix warning in ocfs2_file_aio_write()
  ocfs2_dlmfs: Enable the use of user cluster stacks.
  ocfs2_dlmfs: Use the stackglue.
  ocfs2_dlmfs: Don't honor truncate.  The size of a dlmfs file is LVB_LEN
  ocfs2: Pass the locking protocol into ocfs2_cluster_connect().
  ocfs2: Remove the ast pointers from ocfs2_stack_plugins
  ocfs2: Hang the locking proto on the cluster conn and use it in asts.
  ocfs2: Attach the connection to the lksb
  ocfs2: Pass lksbs back from stackglue ast/bast functions.
  ocfs2_dlmfs: Move to its own directory
  ocfs2_dlmfs: Use poll() to signify BASTs.
  ocfs2_dlmfs: Add capabilities parameter.
  ocfs2: Handle errors while setting external xattr values.
  ocfs2: Set inline xattr entries with ocfs2_xa_set()
  ...
......@@ -46,6 +46,7 @@ ocfs2_stackglue-objs := stackglue.o
ocfs2_stack_o2cb-objs := stack_o2cb.o
ocfs2_stack_user-objs := stack_user.o
obj-$(CONFIG_OCFS2_FS) += dlmfs/
# cluster/ is always needed when OCFS2_FS for masklog support
obj-$(CONFIG_OCFS2_FS) += cluster/
obj-$(CONFIG_OCFS2_FS_O2CB) += dlm/
......@@ -1050,7 +1050,8 @@ static int ocfs2_create_new_meta_bhs(handle_t *handle,
strcpy(eb->h_signature, OCFS2_EXTENT_BLOCK_SIGNATURE);
eb->h_blkno = cpu_to_le64(first_blkno);
eb->h_fs_generation = cpu_to_le32(osb->fs_generation);
eb->h_suballoc_slot = cpu_to_le16(osb->slot_num);
eb->h_suballoc_slot =
cpu_to_le16(meta_ac->ac_alloc_slot);
eb->h_suballoc_bit = cpu_to_le16(suballoc_bit_start);
eb->h_list.l_count =
cpu_to_le16(ocfs2_extent_recs_per_eb(osb->sb));
......@@ -6037,7 +6038,7 @@ static void ocfs2_truncate_log_worker(struct work_struct *work)
if (status < 0)
mlog_errno(status);
else
ocfs2_init_inode_steal_slot(osb);
ocfs2_init_steal_slots(osb);
mlog_exit(status);
}
......
......@@ -577,8 +577,9 @@ static int ocfs2_direct_IO_get_blocks(struct inode *inode, sector_t iblock,
goto bail;
}
/* We should already CoW the refcounted extent. */
BUG_ON(ext_flags & OCFS2_EXT_REFCOUNTED);
/* We should already CoW the refcounted extent in case of create. */
BUG_ON(create && (ext_flags & OCFS2_EXT_REFCOUNTED));
/*
* get_more_blocks() expects us to describe a hole by clearing
* the mapped bit on bh_result().
......
......@@ -112,6 +112,7 @@ static struct mlog_attribute mlog_attrs[MLOG_MAX_BITS] = {
define_mask(XATTR),
define_mask(QUOTA),
define_mask(REFCOUNT),
define_mask(BASTS),
define_mask(ERROR),
define_mask(NOTICE),
define_mask(KTHREAD),
......
......@@ -114,6 +114,7 @@
#define ML_XATTR 0x0000000020000000ULL /* ocfs2 extended attributes */
#define ML_QUOTA 0x0000000040000000ULL /* ocfs2 quota operations */
#define ML_REFCOUNT 0x0000000080000000ULL /* refcount tree operations */
#define ML_BASTS 0x0000001000000000ULL /* dlmglue asts and basts */
/* bits that are infrequently given and frequently matched in the high word */
#define ML_ERROR 0x0000000100000000ULL /* sent to KERN_ERR */
#define ML_NOTICE 0x0000000200000000ULL /* setn to KERN_NOTICE */
......@@ -194,9 +195,9 @@ extern struct mlog_bits mlog_and_bits, mlog_not_bits;
* previous token if args expands to nothing.
*/
#define __mlog_printk(level, fmt, args...) \
printk(level "(%u,%lu):%s:%d " fmt, task_pid_nr(current), \
__mlog_cpu_guess, __PRETTY_FUNCTION__, __LINE__ , \
##args)
printk(level "(%s,%u,%lu):%s:%d " fmt, current->comm, \
task_pid_nr(current), __mlog_cpu_guess, \
__PRETTY_FUNCTION__, __LINE__ , ##args)
#define mlog(mask, fmt, args...) do { \
u64 __m = MLOG_MASK_PREFIX | (mask); \
......
......@@ -2439,7 +2439,7 @@ static int ocfs2_dx_dir_attach_index(struct ocfs2_super *osb,
dx_root = (struct ocfs2_dx_root_block *)dx_root_bh->b_data;
memset(dx_root, 0, osb->sb->s_blocksize);
strcpy(dx_root->dr_signature, OCFS2_DX_ROOT_SIGNATURE);
dx_root->dr_suballoc_slot = cpu_to_le16(osb->slot_num);
dx_root->dr_suballoc_slot = cpu_to_le16(meta_ac->ac_alloc_slot);
dx_root->dr_suballoc_bit = cpu_to_le16(dr_suballoc_bit);
dx_root->dr_fs_generation = cpu_to_le32(osb->fs_generation);
dx_root->dr_blkno = cpu_to_le64(dr_blkno);
......
EXTRA_CFLAGS += -Ifs/ocfs2
obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o ocfs2_dlmfs.o
obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o
ocfs2_dlm-objs := dlmdomain.o dlmdebug.o dlmthread.o dlmrecovery.o \
dlmmaster.o dlmast.o dlmconvert.o dlmlock.o dlmunlock.o dlmver.o
ocfs2_dlmfs-objs := userdlm.o dlmfs.o dlmfsver.o
......@@ -310,7 +310,7 @@ static int dlm_recovery_thread(void *data)
mlog(0, "dlm thread running for %s...\n", dlm->name);
while (!kthread_should_stop()) {
if (dlm_joined(dlm)) {
if (dlm_domain_fully_joined(dlm)) {
status = dlm_do_recovery(dlm);
if (status == -EAGAIN) {
/* do not sleep, recheck immediately. */
......
EXTRA_CFLAGS += -Ifs/ocfs2
obj-$(CONFIG_OCFS2_FS) += ocfs2_dlmfs.o
ocfs2_dlmfs-objs := userdlm.o dlmfs.o dlmfsver.o
......@@ -43,24 +43,17 @@
#include <linux/init.h>
#include <linux/string.h>
#include <linux/backing-dev.h>
#include <linux/poll.h>
#include <asm/uaccess.h>
#include "cluster/nodemanager.h"
#include "cluster/heartbeat.h"
#include "cluster/tcp.h"
#include "dlmapi.h"
#include "stackglue.h"
#include "userdlm.h"
#include "dlmfsver.h"
#define MLOG_MASK_PREFIX ML_DLMFS
#include "cluster/masklog.h"
#include "ocfs2_lockingver.h"
static const struct super_operations dlmfs_ops;
static const struct file_operations dlmfs_file_operations;
......@@ -71,15 +64,46 @@ static struct kmem_cache *dlmfs_inode_cache;
struct workqueue_struct *user_dlm_worker;
/*
* This is the userdlmfs locking protocol version.
* These are the ABI capabilities of dlmfs.
*
* Over time, dlmfs has added some features that were not part of the
* initial ABI. Unfortunately, some of these features are not detectable
* via standard usage. For example, Linux's default poll always returns
* POLLIN, so there is no way for a caller of poll(2) to know when dlmfs
* added poll support. Instead, we provide this list of new capabilities.
*
* Capabilities is a read-only attribute. We do it as a module parameter
* so we can discover it whether dlmfs is built in, loaded, or even not
* loaded.
*
* See fs/ocfs2/dlmglue.c for more details on locking versions.
* The ABI features are local to this machine's dlmfs mount. This is
* distinct from the locking protocol, which is concerned with inter-node
* interaction.
*
* Capabilities:
* - bast : POLLIN against the file descriptor of a held lock
* signifies a bast fired on the lock.
*/
static const struct dlm_protocol_version user_locking_protocol = {
.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
};
#define DLMFS_CAPABILITIES "bast stackglue"
extern int param_set_dlmfs_capabilities(const char *val,
struct kernel_param *kp)
{
printk(KERN_ERR "%s: readonly parameter\n", kp->name);
return -EINVAL;
}
static int param_get_dlmfs_capabilities(char *buffer,
struct kernel_param *kp)
{
return strlcpy(buffer, DLMFS_CAPABILITIES,
strlen(DLMFS_CAPABILITIES) + 1);
}
module_param_call(capabilities, param_set_dlmfs_capabilities,
param_get_dlmfs_capabilities, NULL, 0444);
MODULE_PARM_DESC(capabilities, DLMFS_CAPABILITIES);
/*
* decodes a set of open flags into a valid lock level and a set of flags.
......@@ -179,13 +203,46 @@ static int dlmfs_file_release(struct inode *inode,
return 0;
}
/*
* We do ->setattr() just to override size changes. Our size is the size
* of the LVB and nothing else.
*/
static int dlmfs_file_setattr(struct dentry *dentry, struct iattr *attr)
{
int error;
struct inode *inode = dentry->d_inode;
attr->ia_valid &= ~ATTR_SIZE;
error = inode_change_ok(inode, attr);
if (!error)
error = inode_setattr(inode, attr);
return error;
}
static unsigned int dlmfs_file_poll(struct file *file, poll_table *wait)
{
int event = 0;
struct inode *inode = file->f_path.dentry->d_inode;
struct dlmfs_inode_private *ip = DLMFS_I(inode);
poll_wait(file, &ip->ip_lockres.l_event, wait);
spin_lock(&ip->ip_lockres.l_lock);
if (ip->ip_lockres.l_flags & USER_LOCK_BLOCKED)
event = POLLIN | POLLRDNORM;
spin_unlock(&ip->ip_lockres.l_lock);
return event;
}
static ssize_t dlmfs_file_read(struct file *filp,
char __user *buf,
size_t count,
loff_t *ppos)
{
int bytes_left;
ssize_t readlen;
ssize_t readlen, got;
char *lvb_buf;
struct inode *inode = filp->f_path.dentry->d_inode;
......@@ -211,9 +268,13 @@ static ssize_t dlmfs_file_read(struct file *filp,
if (!lvb_buf)
return -ENOMEM;
user_dlm_read_lvb(inode, lvb_buf, readlen);
bytes_left = __copy_to_user(buf, lvb_buf, readlen);
readlen -= bytes_left;
got = user_dlm_read_lvb(inode, lvb_buf, readlen);
if (got) {
BUG_ON(got != readlen);
bytes_left = __copy_to_user(buf, lvb_buf, readlen);
readlen -= bytes_left;
} else
readlen = 0;
kfree(lvb_buf);
......@@ -272,7 +333,7 @@ static void dlmfs_init_once(void *foo)
struct dlmfs_inode_private *ip =
(struct dlmfs_inode_private *) foo;
ip->ip_dlm = NULL;
ip->ip_conn = NULL;
ip->ip_parent = NULL;
inode_init_once(&ip->ip_vfs_inode);
......@@ -314,14 +375,14 @@ static void dlmfs_clear_inode(struct inode *inode)
goto clear_fields;
}
mlog(0, "we're a directory, ip->ip_dlm = 0x%p\n", ip->ip_dlm);
mlog(0, "we're a directory, ip->ip_conn = 0x%p\n", ip->ip_conn);
/* we must be a directory. If required, lets unregister the
* dlm context now. */
if (ip->ip_dlm)
user_dlm_unregister_context(ip->ip_dlm);
if (ip->ip_conn)
user_dlm_unregister(ip->ip_conn);
clear_fields:
ip->ip_parent = NULL;
ip->ip_dlm = NULL;
ip->ip_conn = NULL;
}
static struct backing_dev_info dlmfs_backing_dev_info = {
......@@ -371,7 +432,7 @@ static struct inode *dlmfs_get_inode(struct inode *parent,
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
ip = DLMFS_I(inode);
ip->ip_dlm = DLMFS_I(parent)->ip_dlm;
ip->ip_conn = DLMFS_I(parent)->ip_conn;
switch (mode & S_IFMT) {
default:
......@@ -425,13 +486,12 @@ static int dlmfs_mkdir(struct inode * dir,
struct inode *inode = NULL;
struct qstr *domain = &dentry->d_name;
struct dlmfs_inode_private *ip;
struct dlm_ctxt *dlm;
struct dlm_protocol_version proto = user_locking_protocol;
struct ocfs2_cluster_connection *conn;
mlog(0, "mkdir %.*s\n", domain->len, domain->name);
/* verify that we have a proper domain */
if (domain->len >= O2NM_MAX_NAME_LEN) {
if (domain->len >= GROUP_NAME_MAX) {
status = -EINVAL;
mlog(ML_ERROR, "invalid domain name for directory.\n");
goto bail;
......@@ -446,14 +506,14 @@ static int dlmfs_mkdir(struct inode * dir,
ip = DLMFS_I(inode);
dlm = user_dlm_register_context(domain, &proto);
if (IS_ERR(dlm)) {
status = PTR_ERR(dlm);
conn = user_dlm_register(domain);
if (IS_ERR(conn)) {
status = PTR_ERR(conn);
mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n",
status, domain->len, domain->name);
goto bail;
}
ip->ip_dlm = dlm;
ip->ip_conn = conn;
inc_nlink(dir);
d_instantiate(dentry, inode);
......@@ -549,6 +609,7 @@ static int dlmfs_fill_super(struct super_block * sb,
static const struct file_operations dlmfs_file_operations = {
.open = dlmfs_file_open,
.release = dlmfs_file_release,
.poll = dlmfs_file_poll,
.read = dlmfs_file_read,
.write = dlmfs_file_write,
};
......@@ -576,6 +637,7 @@ static const struct super_operations dlmfs_ops = {
static const struct inode_operations dlmfs_file_inode_operations = {
.getattr = simple_getattr,
.setattr = dlmfs_file_setattr,
};
static int dlmfs_get_sb(struct file_system_type *fs_type,
......@@ -620,6 +682,7 @@ static int __init init_dlmfs_fs(void)
}
cleanup_worker = 1;
user_dlm_set_locking_protocol();
status = register_filesystem(&dlmfs_fs_type);
bail:
if (status) {
......
......@@ -34,18 +34,19 @@
#include <linux/types.h>
#include <linux/crc32.h>
#include "cluster/nodemanager.h"
#include "cluster/heartbeat.h"
#include "cluster/tcp.h"
#include "dlmapi.h"
#include "ocfs2_lockingver.h"
#include "stackglue.h"
#include "userdlm.h"
#define MLOG_MASK_PREFIX ML_DLMFS
#include "cluster/masklog.h"
static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
{
return container_of(lksb, struct user_lock_res, l_lksb);
}
static inline int user_check_wait_flag(struct user_lock_res *lockres,
int flag)
{
......@@ -73,15 +74,15 @@ static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
}
/* I heart container_of... */
static inline struct dlm_ctxt *
dlm_ctxt_from_user_lockres(struct user_lock_res *lockres)
static inline struct ocfs2_cluster_connection *
cluster_connection_from_user_lockres(struct user_lock_res *lockres)
{
struct dlmfs_inode_private *ip;
ip = container_of(lockres,
struct dlmfs_inode_private,
ip_lockres);
return ip->ip_dlm;
return ip->ip_conn;
}
static struct inode *
......@@ -103,9 +104,9 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
}
#define user_log_dlm_error(_func, _stat, _lockres) do { \
mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
"resource %.*s: %s\n", dlm_errname(_stat), _func, \
_lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \
mlog(ML_ERROR, "Dlm error %d while calling %s on " \
"resource %.*s\n", _stat, _func, \
_lockres->l_namelen, _lockres->l_name); \
} while (0)
/* WARNING: This function lives in a world where the only three lock
......@@ -113,34 +114,35 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
* lock types are added. */
static inline int user_highest_compat_lock_level(int level)
{
int new_level = LKM_EXMODE;
int new_level = DLM_LOCK_EX;
if (level == LKM_EXMODE)
new_level = LKM_NLMODE;
else if (level == LKM_PRMODE)
new_level = LKM_PRMODE;
if (level == DLM_LOCK_EX)
new_level = DLM_LOCK_NL;
else if (level == DLM_LOCK_PR)
new_level = DLM_LOCK_PR;
return new_level;
}
static void user_ast(void *opaque)
static void user_ast(struct ocfs2_dlm_lksb *lksb)
{
struct user_lock_res *lockres = opaque;
struct dlm_lockstatus *lksb;
struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
int status;
mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen,
lockres->l_name);
mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n",
lockres->l_namelen, lockres->l_name, lockres->l_level,
lockres->l_requested);
spin_lock(&lockres->l_lock);
lksb = &(lockres->l_lksb);
if (lksb->status != DLM_NORMAL) {
status = ocfs2_dlm_lock_status(&lockres->l_lksb);
if (status) {
mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
lksb->status, lockres->l_namelen, lockres->l_name);
status, lockres->l_namelen, lockres->l_name);
spin_unlock(&lockres->l_lock);
return;
}
mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE,
mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
"Lockres %.*s, requested ivmode. flags 0x%x\n",
lockres->l_namelen, lockres->l_name, lockres->l_flags);
......@@ -148,13 +150,13 @@ static void user_ast(void *opaque)
if (lockres->l_requested < lockres->l_level) {
if (lockres->l_requested <=
user_highest_compat_lock_level(lockres->l_blocking)) {
lockres->l_blocking = LKM_NLMODE;
lockres->l_blocking = DLM_LOCK_NL;
lockres->l_flags &= ~USER_LOCK_BLOCKED;
}
}
lockres->l_level = lockres->l_requested;
lockres->l_requested = LKM_IVMODE;
lockres->l_requested = DLM_LOCK_IV;
lockres->l_flags |= USER_LOCK_ATTACHED;
lockres->l_flags &= ~USER_LOCK_BUSY;
......@@ -193,11 +195,11 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
return;
switch (lockres->l_blocking) {
case LKM_EXMODE:
case DLM_LOCK_EX:
if (!lockres->l_ex_holders && !lockres->l_ro_holders)
queue = 1;
break;
case LKM_PRMODE:
case DLM_LOCK_PR:
if (!lockres->l_ex_holders)
queue = 1;
break;
......@@ -209,12 +211,12 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
__user_dlm_queue_lockres(lockres);
}
static void user_bast(void *opaque, int level)
static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
{
struct user_lock_res *lockres = opaque;
struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n",
lockres->l_namelen, lockres->l_name, level);
mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n",
lockres->l_namelen, lockres->l_name, level, lockres->l_level);
spin_lock(&lockres->l_lock);
lockres->l_flags |= USER_LOCK_BLOCKED;
......@@ -227,15 +229,15 @@ static void user_bast(void *opaque, int level)
wake_up(&lockres->l_event);
}
static void user_unlock_ast(void *opaque, enum dlm_status status)
static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
{
struct user_lock_res *lockres = opaque;
struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen,
lockres->l_name);
mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n",
lockres->l_namelen, lockres->l_name, lockres->l_flags);
if (status != DLM_NORMAL && status != DLM_CANCELGRANT)
mlog(ML_ERROR, "Dlm returns status %d\n", status);
if (status)
mlog(ML_ERROR, "dlm returns status %d\n", status);
spin_lock(&lockres->l_lock);
/* The teardown flag gets set early during the unlock process,
......@@ -243,7 +245,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
* for a concurrent cancel. */
if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
&& !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
lockres->l_level = LKM_IVMODE;
lockres->l_level = DLM_LOCK_IV;
} else if (status == DLM_CANCELGRANT) {
/* We tried to cancel a convert request, but it was
* already granted. Don't clear the busy flag - the
......@@ -254,7 +256,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
} else {
BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
/* Cancel succeeded, we want to re-queue */
lockres->l_requested = LKM_IVMODE; /* cancel an
lockres->l_requested = DLM_LOCK_IV; /* cancel an
* upconvert
* request. */
lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
......@@ -271,6 +273,21 @@ out_noclear:
wake_up(&lockres->l_event);
}
/*
* This is the userdlmfs locking protocol version.
*
* See fs/ocfs2/dlmglue.c for more details on locking versions.
*/
static struct ocfs2_locking_protocol user_dlm_lproto = {
.lp_max_version = {
.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
},
.lp_lock_ast = user_ast,
.lp_blocking_ast = user_bast,
.lp_unlock_ast = user_unlock_ast,
};
static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
{
struct inode *inode;
......@@ -283,10 +300,10 @@ static void user_dlm_unblock_lock(struct work_struct *work)
int new_level, status;
struct user_lock_res *lockres =
container_of(work, struct user_lock_res, l_work);
struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
struct ocfs2_cluster_connection *conn =
cluster_connection_from_user_lockres(lockres);
mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
lockres->l_name);
mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
spin_lock(&lockres->l_lock);
......@@ -304,17 +321,23 @@ static void user_dlm_unblock_lock(struct work_struct *work)
* flag, and finally we might get another bast which re-queues
* us before our ast for the downconvert is called. */
if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n",
lockres->l_namelen, lockres->l_name);
spin_unlock(&lockres->l_lock);
goto drop_ref;
}
if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n",
lockres->l_namelen, lockres->l_name);
spin_unlock(&lockres->l_lock);
goto drop_ref;
}
if (lockres->l_flags & USER_LOCK_BUSY) {
if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n",
lockres->l_namelen, lockres->l_name);
spin_unlock(&lockres->l_lock);
goto drop_ref;
}
......@@ -322,32 +345,31 @@ static void user_dlm_unblock_lock(struct work_struct *work)
lockres->l_flags |= USER_LOCK_IN_CANCEL;
spin_unlock(&lockres->l_lock);
status = dlmunlock(dlm,
&lockres->l_lksb,
LKM_CANCEL,
user_unlock_ast,
lockres);
if (status != DLM_NORMAL)
user_log_dlm_error("dlmunlock", status, lockres);
status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
DLM_LKF_CANCEL);
if (status)
user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
goto drop_ref;
}
/* If there are still incompat holders, we can exit safely
* without worrying about re-queueing this lock as that will
* happen on the last call to user_cluster_unlock. */
if ((lockres->l_blocking == LKM_EXMODE)
if ((lockres->l_blocking == DLM_LOCK_EX)
&& (lockres->l_ex_holders || lockres->l_ro_holders)) {
spin_unlock(&lockres->l_lock);
mlog(0, "can't downconvert for ex: ro = %u, ex = %u\n",
lockres->l_ro_holders, lockres->l_ex_holders);
mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n",
lockres->l_namelen, lockres->l_name,
lockres->l_ex_holders, lockres->l_ro_holders);
goto drop_ref;
}
if ((lockres->l_blocking == LKM_PRMODE)
if ((lockres->l_blocking == DLM_LOCK_PR)
&& lockres->l_ex_holders) {
spin_unlock(&lockres->l_lock);
mlog(0, "can't downconvert for pr: ex = %u\n",
lockres->l_ex_holders);
mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n",
lockres->l_namelen, lockres->l_name,
lockres->l_ex_holders);
goto drop_ref;
}
......@@ -355,22 +377,17 @@ static void user_dlm_unblock_lock(struct work_struct *work)
new_level = user_highest_compat_lock_level(lockres->l_blocking);
lockres->l_requested = new_level;
lockres->l_flags |= USER_LOCK_BUSY;
mlog(0, "Downconvert lock from %d to %d\n",
lockres->l_level, new_level);
mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n",
lockres->l_namelen, lockres->l_name, lockres->l_level, new_level);
spin_unlock(&lockres->l_lock);
/* need lock downconvert request now... */
status = dlmlock(dlm,
new_level,
&lockres->l_lksb,
LKM_CONVERT|LKM_VALBLK,
lockres->l_name,
lockres->l_namelen,
user_ast,
lockres,
user_bast);
if (status != DLM_NORMAL) {
user_log_dlm_error("dlmlock", status, lockres);
status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
DLM_LKF_CONVERT|DLM_LKF_VALBLK,
lockres->l_name,
lockres->l_namelen);
if (status) {
user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
user_recover_from_dlm_error(lockres);
}
......@@ -382,10 +399,10 @@ static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
int level)
{
switch(level) {
case LKM_EXMODE:
case DLM_LOCK_EX:
lockres->l_ex_holders++;
break;
case LKM_PRMODE:
case DLM_LOCK_PR:
lockres->l_ro_holders++;
break;
default:
......@@ -410,20 +427,19 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,
int lkm_flags)
{
int status, local_flags;
struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
struct ocfs2_cluster_connection *conn =
cluster_connection_from_user_lockres(lockres);
if (level != LKM_EXMODE &&
level != LKM_PRMODE) {
if (level != DLM_LOCK_EX &&
level != DLM_LOCK_PR) {
mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
lockres->l_namelen, lockres->l_name);
status = -EINVAL;
goto bail;
}
mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n",
lockres->l_namelen, lockres->l_name,
(level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
lkm_flags);
mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n",
lockres->l_namelen, lockres->l_name, level, lkm_flags);
again:
if (signal_pending(current)) {
......@@ -457,35 +473,26 @@ again:
}
if (level > lockres->l_level) {
local_flags = lkm_flags | LKM_VALBLK;
if (lockres->l_level != LKM_IVMODE)
local_flags |= LKM_CONVERT;
local_flags = lkm_flags | DLM_LKF_VALBLK;
if (lockres->l_level != DLM_LOCK_IV)
local_flags |= DLM_LKF_CONVERT;
lockres->l_requested = level;
lockres->l_flags |= USER_LOCK_BUSY;
spin_unlock(&lockres->l_lock);
BUG_ON(level == LKM_IVMODE);
BUG_ON(level == LKM_NLMODE);
BUG_ON(level == DLM_LOCK_IV);
BUG_ON(level == DLM_LOCK_NL);
/* call dlm_lock to upgrade lock now */
status = dlmlock(dlm,
level,
&lockres->l_lksb,
local_flags,
lockres->l_name,
lockres->l_namelen,
user_ast,
lockres,
user_bast);
if (status != DLM_NORMAL) {
if ((lkm_flags & LKM_NOQUEUE) &&
(status == DLM_NOTQUEUED))
status = -EAGAIN;
else {
user_log_dlm_error("dlmlock", status, lockres);
status = -EINVAL;
}
status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
local_flags, lockres->l_name,
lockres->l_namelen);
if (status) {
if ((lkm_flags & DLM_LKF_NOQUEUE) &&
(status != -EAGAIN))
user_log_dlm_error("ocfs2_dlm_lock",
status, lockres);
user_recover_from_dlm_error(lockres);
goto bail;
}
......@@ -506,11 +513,11 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
int level)
{
switch(level) {
case LKM_EXMODE:
case DLM_LOCK_EX:
BUG_ON(!lockres->l_ex_holders);
lockres->l_ex_holders--;
break;
case LKM_PRMODE:
case DLM_LOCK_PR:
BUG_ON(!lockres->l_ro_holders);
lockres->l_ro_holders--;
break;
......@@ -522,8 +529,8 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
void user_dlm_cluster_unlock(struct user_lock_res *lockres,
int level)
{
if (level != LKM_EXMODE &&
level != LKM_PRMODE) {
if (level != DLM_LOCK_EX &&
level != DLM_LOCK_PR) {
mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
lockres->l_namelen, lockres->l_name);
return;
......@@ -540,33 +547,40 @@ void user_dlm_write_lvb(struct inode *inode,
unsigned int len)
{
struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
char *lvb = lockres->l_lksb.lvb;
char *lvb;
BUG_ON(len > DLM_LVB_LEN);
spin_lock(&lockres->l_lock);
BUG_ON(lockres->l_level < LKM_EXMODE);
BUG_ON(lockres->l_level < DLM_LOCK_EX);
lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
memcpy(lvb, val, len);
spin_unlock(&lockres->l_lock);
}
void user_dlm_read_lvb(struct inode *inode,
char *val,
unsigned int len)
ssize_t user_dlm_read_lvb(struct inode *inode,
char *val,
unsigned int len)
{
struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
char *lvb = lockres->l_lksb.lvb;
char *lvb;
ssize_t ret = len;
BUG_ON(len > DLM_LVB_LEN);
spin_lock(&lockres->l_lock);
BUG_ON(lockres->l_level < LKM_PRMODE);
memcpy(val, lvb, len);
BUG_ON(lockres->l_level < DLM_LOCK_PR);
if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
memcpy(val, lvb, len);
} else
ret = 0;
spin_unlock(&lockres->l_lock);
return ret;
}
void user_dlm_lock_res_init(struct user_lock_res *lockres,
......@@ -576,9 +590,9 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
spin_lock_init(&lockres->l_lock);
init_waitqueue_head(&lockres->l_event);
lockres->l_level = LKM_IVMODE;
lockres->l_requested = LKM_IVMODE;
lockres->l_blocking = LKM_IVMODE;
lockres->l_level = DLM_LOCK_IV;
lockres->l_requested = DLM_LOCK_IV;
lockres->l_blocking = DLM_LOCK_IV;
/* should have been checked before getting here. */
BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
......@@ -592,9 +606,10 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
int user_dlm_destroy_lock(struct user_lock_res *lockres)
{
int status = -EBUSY;
struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
struct ocfs2_cluster_connection *conn =
cluster_connection_from_user_lockres(lockres);
mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name);
mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
spin_lock(&lockres->l_lock);
if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
......@@ -627,14 +642,9 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
lockres->l_flags |= USER_LOCK_BUSY;
spin_unlock(&lockres->l_lock);
status = dlmunlock(dlm,
&lockres->l_lksb,
LKM_VALBLK,
user_unlock_ast,
lockres);
if (status != DLM_NORMAL) {
user_log_dlm_error("dlmunlock", status, lockres);
status = -EINVAL;
status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
if (status) {
user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
goto bail;
}
......@@ -645,32 +655,34 @@ bail:
return status;
}
struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
struct dlm_protocol_version *proto)
static void user_dlm_recovery_handler_noop(int node_num,
void *recovery_data)
{
struct dlm_ctxt *dlm;
u32 dlm_key;
char *domain;
domain = kmalloc(name->len + 1, GFP_NOFS);
if (!domain) {
mlog_errno(-ENOMEM);
return ERR_PTR(-ENOMEM);
}
/* We ignore recovery events */
return;
}
dlm_key = crc32_le(0, name->name, name->len);
void user_dlm_set_locking_protocol(void)
{
ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
}
snprintf(domain, name->len + 1, "%.*s", name->len, name->name);
struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name)
{
int rc;
struct ocfs2_cluster_connection *conn;
dlm = dlm_register_domain(domain, dlm_key, proto);
if (IS_ERR(dlm))
mlog_errno(PTR_ERR(dlm));
rc = ocfs2_cluster_connect_agnostic(name->name, name->len,
&user_dlm_lproto,
user_dlm_recovery_handler_noop,
NULL, &conn);
if (rc)
mlog_errno(rc);
kfree(domain);
return dlm;
return rc ? ERR_PTR(rc) : conn;
}
void user_dlm_unregister_context(struct dlm_ctxt *dlm)
void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
{
dlm_unregister_domain(dlm);
ocfs2_cluster_disconnect(conn, 0);
}
......@@ -57,7 +57,7 @@ struct user_lock_res {
int l_level;
unsigned int l_ro_holders;
unsigned int l_ex_holders;
struct dlm_lockstatus l_lksb;
struct ocfs2_dlm_lksb l_lksb;
int l_requested;
int l_blocking;
......@@ -80,15 +80,15 @@ void user_dlm_cluster_unlock(struct user_lock_res *lockres,
void user_dlm_write_lvb(struct inode *inode,
const char *val,
unsigned int len);
void user_dlm_read_lvb(struct inode *inode,
char *val,
unsigned int len);
struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
struct dlm_protocol_version *proto);
void user_dlm_unregister_context(struct dlm_ctxt *dlm);
ssize_t user_dlm_read_lvb(struct inode *inode,
char *val,
unsigned int len);
struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name);
void user_dlm_unregister(struct ocfs2_cluster_connection *conn);
void user_dlm_set_locking_protocol(void);
struct dlmfs_inode_private {
struct dlm_ctxt *ip_dlm;
struct ocfs2_cluster_connection *ip_conn;
struct user_lock_res ip_lockres; /* unused for directories. */
struct inode *ip_parent;
......
......@@ -297,6 +297,11 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
}
static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
{
return container_of(lksb, struct ocfs2_lock_res, l_lksb);
}
static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
{
BUG_ON(!ocfs2_is_inode_lock(lockres));
......@@ -927,6 +932,10 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
lockres->l_blocking = level;
}
mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
lockres->l_name, level, lockres->l_level, lockres->l_blocking,
needs_downconvert);
if (needs_downconvert)
lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
......@@ -1040,18 +1049,17 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
return lockres->l_pending_gen;
}
static void ocfs2_blocking_ast(void *opaque, int level)
static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
{
struct ocfs2_lock_res *lockres = opaque;
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
int needs_downconvert;
unsigned long flags;
BUG_ON(level <= DLM_LOCK_NL);
mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
lockres->l_name, level, lockres->l_level,
mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
"type %s\n", lockres->l_name, level, lockres->l_level,
ocfs2_lock_type_string(lockres->l_type));
/*
......@@ -1072,9 +1080,9 @@ static void ocfs2_blocking_ast(void *opaque, int level)
ocfs2_wake_downconvert_thread(osb);
}
static void ocfs2_locking_ast(void *opaque)
static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
{
struct ocfs2_lock_res *lockres = opaque;
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
unsigned long flags;
int status;
......@@ -1095,6 +1103,10 @@ static void ocfs2_locking_ast(void *opaque)
return;
}
mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
"level %d => %d\n", lockres->l_name, lockres->l_action,
lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
switch(lockres->l_action) {
case OCFS2_AST_ATTACH:
ocfs2_generic_handle_attach_action(lockres);
......@@ -1107,8 +1119,8 @@ static void ocfs2_locking_ast(void *opaque)
ocfs2_generic_handle_downconvert_action(lockres);
break;
default:
mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
"lockres flags = 0x%lx, unlock action: %u\n",
mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
"flags 0x%lx, unlock: %u\n",
lockres->l_name, lockres->l_action, lockres->l_flags,
lockres->l_unlock_action);
BUG();
......@@ -1134,6 +1146,88 @@ out:
spin_unlock_irqrestore(&lockres->l_lock, flags);
}
static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
{
struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
unsigned long flags;
mlog_entry_void();
mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
lockres->l_name, lockres->l_unlock_action);
spin_lock_irqsave(&lockres->l_lock, flags);
if (error) {
mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
"unlock_action %d\n", error, lockres->l_name,
lockres->l_unlock_action);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
return;
}
switch(lockres->l_unlock_action) {
case OCFS2_UNLOCK_CANCEL_CONVERT:
mlog(0, "Cancel convert success for %s\n", lockres->l_name);
lockres->l_action = OCFS2_AST_INVALID;
/* Downconvert thread may have requeued this lock, we
* need to wake it. */
if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
break;
case OCFS2_UNLOCK_DROP_LOCK:
lockres->l_level = DLM_LOCK_IV;
break;
default:
BUG();
}
lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
wake_up(&lockres->l_event);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
}
/*
* This is the filesystem locking protocol. It provides the lock handling
* hooks for the underlying DLM. It has a maximum version number.
* The version number allows interoperability with systems running at
* the same major number and an equal or smaller minor number.
*
* Whenever the filesystem does new things with locks (adds or removes a
* lock, orders them differently, does different things underneath a lock),
* the version must be changed. The protocol is negotiated when joining
* the dlm domain. A node may join the domain if its major version is
* identical to all other nodes and its minor version is greater than
* or equal to all other nodes. When its minor version is greater than
* the other nodes, it will run at the minor version specified by the
* other nodes.
*
* If a locking change is made that will not be compatible with older
* versions, the major number must be increased and the minor version set
* to zero. If a change merely adds a behavior that can be disabled when
* speaking to older versions, the minor version must be increased. If a
* change adds a fully backwards compatible change (eg, LVB changes that
* are just ignored by older versions), the version does not need to be
* updated.
*/
static struct ocfs2_locking_protocol lproto = {
.lp_max_version = {
.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
},
.lp_lock_ast = ocfs2_locking_ast,
.lp_blocking_ast = ocfs2_blocking_ast,
.lp_unlock_ast = ocfs2_unlock_ast,
};
void ocfs2_set_locking_protocol(void)
{
ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
}
static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
int convert)
{
......@@ -1189,8 +1283,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
&lockres->l_lksb,
dlm_flags,
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
OCFS2_LOCK_ID_MAX_LEN - 1);
lockres_clear_pending(lockres, gen, osb);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
......@@ -1412,7 +1505,7 @@ again:
BUG_ON(level == DLM_LOCK_IV);
BUG_ON(level == DLM_LOCK_NL);
mlog(0, "lock %s, convert from %d to level = %d\n",
mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
lockres->l_name, lockres->l_level, level);
/* call dlm_lock to upgrade lock now */
......@@ -1421,8 +1514,7 @@ again:
&lockres->l_lksb,
lkm_flags,
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
OCFS2_LOCK_ID_MAX_LEN - 1);
lockres_clear_pending(lockres, gen, osb);
if (ret) {
if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
......@@ -1859,8 +1951,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
spin_unlock_irqrestore(&lockres->l_lock, flags);
ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
if (ret) {
if (!trylock || (ret != -EAGAIN)) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
......@@ -2989,7 +3080,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
status = ocfs2_cluster_connect(osb->osb_cluster_stack,
osb->uuid_str,
strlen(osb->uuid_str),
ocfs2_do_node_down, osb,
&lproto, ocfs2_do_node_down, osb,
&conn);
if (status) {
mlog_errno(status);
......@@ -3056,50 +3147,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
mlog_exit_void();
}
static void ocfs2_unlock_ast(void *opaque, int error)
{
struct ocfs2_lock_res *lockres = opaque;
unsigned long flags;
mlog_entry_void();
mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
lockres->l_unlock_action);
spin_lock_irqsave(&lockres->l_lock, flags);
if (error) {
mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
"unlock_action %d\n", error, lockres->l_name,
lockres->l_unlock_action);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
return;
}
switch(lockres->l_unlock_action) {
case OCFS2_UNLOCK_CANCEL_CONVERT:
mlog(0, "Cancel convert success for %s\n", lockres->l_name);
lockres->l_action = OCFS2_AST_INVALID;
/* Downconvert thread may have requeued this lock, we
* need to wake it. */
if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
break;
case OCFS2_UNLOCK_DROP_LOCK:
lockres->l_level = DLM_LOCK_IV;
break;
default:
BUG();
}
lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
wake_up(&lockres->l_event);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
}
static int ocfs2_drop_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
......@@ -3167,8 +3214,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
mlog(0, "lock %s\n", lockres->l_name);
ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
lockres);
ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
......@@ -3276,13 +3322,20 @@ static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
if (lockres->l_level <= new_level) {
mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
lockres->l_level, new_level);
mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
"type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
"block %d, pgen %d\n", lockres->l_name, lockres->l_level,
new_level, list_empty(&lockres->l_blocked_list),
list_empty(&lockres->l_mask_waiters), lockres->l_type,
lockres->l_flags, lockres->l_ro_holders,
lockres->l_ex_holders, lockres->l_action,
lockres->l_unlock_action, lockres->l_requested,
lockres->l_blocking, lockres->l_pending_gen);
BUG();
}
mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
lockres->l_name, new_level, lockres->l_blocking);
mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
lockres->l_action = OCFS2_AST_DOWNCONVERT;
lockres->l_requested = new_level;
......@@ -3301,6 +3354,9 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
mlog_entry_void();
mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
lockres->l_level, new_level);
if (lvb)
dlm_flags |= DLM_LKF_VALBLK;
......@@ -3309,8 +3365,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
&lockres->l_lksb,
dlm_flags,
lockres->l_name,
OCFS2_LOCK_ID_MAX_LEN - 1,
lockres);
OCFS2_LOCK_ID_MAX_LEN - 1);
lockres_clear_pending(lockres, generation, osb);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
......@@ -3331,14 +3386,12 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
assert_spin_locked(&lockres->l_lock);
mlog_entry_void();
mlog(0, "lock %s\n", lockres->l_name);
if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
/* If we're already trying to cancel a lock conversion
* then just drop the spinlock and allow the caller to
* requeue this lock. */
mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
return 0;
}
......@@ -3353,6 +3406,8 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
"lock %s, invalid flags: 0x%lx\n",
lockres->l_name, lockres->l_flags);
mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
return 1;
}
......@@ -3362,16 +3417,15 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
int ret;
mlog_entry_void();
mlog(0, "lock %s\n", lockres->l_name);
ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
DLM_LKF_CANCEL, lockres);
DLM_LKF_CANCEL);
if (ret) {
ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
ocfs2_recover_from_dlm_error(lockres, 0);
}
mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
mlog_exit(ret);
return ret;
......@@ -3428,8 +3482,11 @@ recheck:
* at the same time they set OCFS2_DLM_BUSY. They must
* clear OCFS2_DLM_PENDING after dlm_lock() returns.
*/
if (lockres->l_flags & OCFS2_LOCK_PENDING)
if (lockres->l_flags & OCFS2_LOCK_PENDING) {
mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
lockres->l_name);
goto leave_requeue;
}
ctl->requeue = 1;
ret = ocfs2_prepare_cancel_convert(osb, lockres);
......@@ -3461,6 +3518,7 @@ recheck:
*/
if (lockres->l_level == DLM_LOCK_NL) {
BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
lockres->l_blocking = DLM_LOCK_NL;
lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
spin_unlock_irqrestore(&lockres->l_lock, flags);
......@@ -3470,28 +3528,41 @@ recheck:
/* if we're blocking an exclusive and we have *any* holders,
* then requeue. */
if ((lockres->l_blocking == DLM_LOCK_EX)
&& (lockres->l_ex_holders || lockres->l_ro_holders))
&& (lockres->l_ex_holders || lockres->l_ro_holders)) {
mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
lockres->l_name, lockres->l_ex_holders,
lockres->l_ro_holders);
goto leave_requeue;
}
/* If it's a PR we're blocking, then only
* requeue if we've got any EX holders */
if (lockres->l_blocking == DLM_LOCK_PR &&
lockres->l_ex_holders)
lockres->l_ex_holders) {
mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
lockres->l_name, lockres->l_ex_holders);
goto leave_requeue;
}
/*
* Can we get a lock in this state if the holder counts are
* zero? The meta data unblock code used to check this.
*/
if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
&& (lockres->l_flags & OCFS2_LOCK_REFRESHING))
&& (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
lockres->l_name);
goto leave_requeue;
}
new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
if (lockres->l_ops->check_downconvert
&& !lockres->l_ops->check_downconvert(lockres, new_level))
&& !lockres->l_ops->check_downconvert(lockres, new_level)) {
mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
lockres->l_name);
goto leave_requeue;
}
/* If we get here, then we know that there are no more
* incompatible holders (and anyone asking for an incompatible
......@@ -3509,13 +3580,19 @@ recheck:
ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
if (ctl->unblock_action == UNBLOCK_STOP_POST)
if (ctl->unblock_action == UNBLOCK_STOP_POST) {
mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
lockres->l_name);
goto leave;
}
spin_lock_irqsave(&lockres->l_lock, flags);
if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
/* If this changed underneath us, then we can't drop
* it just yet. */
mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
"Recheck\n", lockres->l_name, blocking,
lockres->l_blocking, level, lockres->l_level);
goto recheck;
}
......@@ -3910,45 +3987,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
ocfs2_cluster_unlock(osb, lockres, level);
}
/*
* This is the filesystem locking protocol. It provides the lock handling
* hooks for the underlying DLM. It has a maximum version number.
* The version number allows interoperability with systems running at
* the same major number and an equal or smaller minor number.
*
* Whenever the filesystem does new things with locks (adds or removes a
* lock, orders them differently, does different things underneath a lock),
* the version must be changed. The protocol is negotiated when joining
* the dlm domain. A node may join the domain if its major version is
* identical to all other nodes and its minor version is greater than
* or equal to all other nodes. When its minor version is greater than
* the other nodes, it will run at the minor version specified by the
* other nodes.
*
* If a locking change is made that will not be compatible with older
* versions, the major number must be increased and the minor version set
* to zero. If a change merely adds a behavior that can be disabled when
* speaking to older versions, the minor version must be increased. If a
* change adds a fully backwards compatible change (eg, LVB changes that
* are just ignored by older versions), the version does not need to be
* updated.
*/
static struct ocfs2_locking_protocol lproto = {
.lp_max_version = {
.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
},
.lp_lock_ast = ocfs2_locking_ast,
.lp_blocking_ast = ocfs2_blocking_ast,
.lp_unlock_ast = ocfs2_unlock_ast,
};
void ocfs2_set_locking_protocol(void)
{
ocfs2_stack_glue_set_locking_protocol(&lproto);
}
static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
......@@ -3965,7 +4003,7 @@ static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
BUG_ON(!lockres);
BUG_ON(!lockres->l_ops);
mlog(0, "lockres %s blocked.\n", lockres->l_name);
mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
/* Detect whether a lock has been marked as going away while
* the downconvert thread was processing other things. A lock can
......@@ -3988,7 +4026,7 @@ unqueue:
} else
ocfs2_schedule_blocked_lock(osb, lockres);
mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
ctl.requeue ? "yes" : "no");
spin_unlock_irqrestore(&lockres->l_lock, flags);
......@@ -4010,7 +4048,7 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
/* Do not schedule a lock for downconvert when it's on
* the way to destruction - any nodes wanting access
* to the resource will get it soon. */
mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
lockres->l_name, lockres->l_flags);
return;
}
......
......@@ -993,10 +993,9 @@ int ocfs2_setattr(struct dentry *dentry, struct iattr *attr)
}
if (size_change && attr->ia_size != i_size_read(inode)) {
if (attr->ia_size > sb->s_maxbytes) {
status = -EFBIG;
status = inode_newsize_ok(inode, attr->ia_size);
if (status)
goto bail_unlock;
}
if (i_size_read(inode) > attr->ia_size) {
if (ocfs2_should_order_data(inode)) {
......@@ -1836,6 +1835,8 @@ static int ocfs2_prepare_inode_for_write(struct dentry *dentry,
&meta_level);
if (has_refcount)
*has_refcount = 1;
if (direct_io)
*direct_io = 0;
}
if (ret < 0) {
......@@ -1859,10 +1860,6 @@ static int ocfs2_prepare_inode_for_write(struct dentry *dentry,
break;
}
if (has_refcount && *has_refcount == 1) {
*direct_io = 0;
break;
}
/*
* Allowing concurrent direct writes means
* i_size changes wouldn't be synchronized, so
......@@ -2043,7 +2040,7 @@ out_dio:
* async dio is going to do it in the future or an end_io after an
* error has already done it.
*/
if (ret == -EIOCBQUEUED || !ocfs2_iocb_is_rw_locked(iocb)) {
if ((ret == -EIOCBQUEUED) || (!ocfs2_iocb_is_rw_locked(iocb))) {
rw_level = -1;
have_alloc_sem = 0;
}
......
......@@ -7,10 +7,10 @@
*
*/
#ifndef OCFS2_IOCTL_H
#define OCFS2_IOCTL_H
#ifndef OCFS2_IOCTL_PROTO_H
#define OCFS2_IOCTL_PROTO_H
long ocfs2_ioctl(struct file *filp, unsigned int cmd, unsigned long arg);
long ocfs2_compat_ioctl(struct file *file, unsigned cmd, unsigned long arg);
#endif /* OCFS2_IOCTL_H */
#endif /* OCFS2_IOCTL_PROTO_H */
......@@ -476,7 +476,7 @@ out_mutex:
out:
if (!status)
ocfs2_init_inode_steal_slot(osb);
ocfs2_init_steal_slots(osb);
mlog_exit(status);
return status;
}
......
......@@ -42,6 +42,7 @@
#include "ocfs2_fs.h"
#include "ocfs2_lockid.h"
#include "ocfs2_ioctl.h"
/* For struct ocfs2_blockcheck_stats */
#include "blockcheck.h"
......@@ -159,7 +160,7 @@ struct ocfs2_lock_res {
int l_level;
unsigned int l_ro_holders;
unsigned int l_ex_holders;
union ocfs2_dlm_lksb l_lksb;
struct ocfs2_dlm_lksb l_lksb;
/* used from AST/BAST funcs. */
enum ocfs2_ast_action l_action;
......@@ -305,7 +306,9 @@ struct ocfs2_super
u32 s_next_generation;
unsigned long osb_flags;
s16 s_inode_steal_slot;
s16 s_meta_steal_slot;
atomic_t s_num_inodes_stolen;
atomic_t s_num_meta_stolen;
unsigned long s_mount_opt;
unsigned int s_atime_quantum;
......@@ -760,33 +763,6 @@ static inline unsigned int ocfs2_megabytes_to_clusters(struct super_block *sb,
return megs << (20 - OCFS2_SB(sb)->s_clustersize_bits);
}
static inline void ocfs2_init_inode_steal_slot(struct ocfs2_super *osb)
{
spin_lock(&osb->osb_lock);
osb->s_inode_steal_slot = OCFS2_INVALID_SLOT;
spin_unlock(&osb->osb_lock);
atomic_set(&osb->s_num_inodes_stolen, 0);
}
static inline void ocfs2_set_inode_steal_slot(struct ocfs2_super *osb,
s16 slot)
{
spin_lock(&osb->osb_lock);
osb->s_inode_steal_slot = slot;
spin_unlock(&osb->osb_lock);
}
static inline s16 ocfs2_get_inode_steal_slot(struct ocfs2_super *osb)
{
s16 slot;
spin_lock(&osb->osb_lock);
slot = osb->s_inode_steal_slot;
spin_unlock(&osb->osb_lock);
return slot;
}
#define ocfs2_set_bit ext2_set_bit
#define ocfs2_clear_bit ext2_clear_bit
#define ocfs2_test_bit ext2_test_bit
......
......@@ -253,63 +253,6 @@
* counted in an associated
* refcount tree */
/*
* ioctl commands
*/
#define OCFS2_IOC_GETFLAGS _IOR('f', 1, long)
#define OCFS2_IOC_SETFLAGS _IOW('f', 2, long)
#define OCFS2_IOC32_GETFLAGS _IOR('f', 1, int)
#define OCFS2_IOC32_SETFLAGS _IOW('f', 2, int)
/*
* Space reservation / allocation / free ioctls and argument structure
* are designed to be compatible with XFS.
*
* ALLOCSP* and FREESP* are not and will never be supported, but are
* included here for completeness.
*/
struct ocfs2_space_resv {
__s16 l_type;
__s16 l_whence;
__s64 l_start;
__s64 l_len; /* len == 0 means until end of file */
__s32 l_sysid;
__u32 l_pid;
__s32 l_pad[4]; /* reserve area */
};
#define OCFS2_IOC_ALLOCSP _IOW ('X', 10, struct ocfs2_space_resv)
#define OCFS2_IOC_FREESP _IOW ('X', 11, struct ocfs2_space_resv)
#define OCFS2_IOC_RESVSP _IOW ('X', 40, struct ocfs2_space_resv)
#define OCFS2_IOC_UNRESVSP _IOW ('X', 41, struct ocfs2_space_resv)
#define OCFS2_IOC_ALLOCSP64 _IOW ('X', 36, struct ocfs2_space_resv)
#define OCFS2_IOC_FREESP64 _IOW ('X', 37, struct ocfs2_space_resv)
#define OCFS2_IOC_RESVSP64 _IOW ('X', 42, struct ocfs2_space_resv)
#define OCFS2_IOC_UNRESVSP64 _IOW ('X', 43, struct ocfs2_space_resv)
/* Used to pass group descriptor data when online resize is done */
struct ocfs2_new_group_input {
__u64 group; /* Group descriptor's blkno. */
__u32 clusters; /* Total number of clusters in this group */
__u32 frees; /* Total free clusters in this group */
__u16 chain; /* Chain for this group */
__u16 reserved1;
__u32 reserved2;
};
#define OCFS2_IOC_GROUP_EXTEND _IOW('o', 1, int)
#define OCFS2_IOC_GROUP_ADD _IOW('o', 2,struct ocfs2_new_group_input)
#define OCFS2_IOC_GROUP_ADD64 _IOW('o', 3,struct ocfs2_new_group_input)
/* Used to pass 2 file names to reflink. */
struct reflink_arguments {
__u64 old_path;
__u64 new_path;
__u64 preserve;
};
#define OCFS2_IOC_REFLINK _IOW('o', 4, struct reflink_arguments)
/*
* Journal Flags (ocfs2_dinode.id1.journal1.i_flags)
*/
......
/* -*- mode: c; c-basic-offset: 8; -*-
* vim: noexpandtab sw=8 ts=8 sts=0:
*
* ocfs2_ioctl.h
*
* Defines OCFS2 ioctls.
*
* Copyright (C) 2010 Oracle. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License, version 2, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#ifndef OCFS2_IOCTL_H
#define OCFS2_IOCTL_H
/*
* ioctl commands
*/
#define OCFS2_IOC_GETFLAGS _IOR('f', 1, long)
#define OCFS2_IOC_SETFLAGS _IOW('f', 2, long)
#define OCFS2_IOC32_GETFLAGS _IOR('f', 1, int)
#define OCFS2_IOC32_SETFLAGS _IOW('f', 2, int)
/*
* Space reservation / allocation / free ioctls and argument structure
* are designed to be compatible with XFS.
*
* ALLOCSP* and FREESP* are not and will never be supported, but are
* included here for completeness.
*/
struct ocfs2_space_resv {
__s16 l_type;
__s16 l_whence;
__s64 l_start;
__s64 l_len; /* len == 0 means until end of file */
__s32 l_sysid;
__u32 l_pid;
__s32 l_pad[4]; /* reserve area */
};
#define OCFS2_IOC_ALLOCSP _IOW ('X', 10, struct ocfs2_space_resv)
#define OCFS2_IOC_FREESP _IOW ('X', 11, struct ocfs2_space_resv)
#define OCFS2_IOC_RESVSP _IOW ('X', 40, struct ocfs2_space_resv)
#define OCFS2_IOC_UNRESVSP _IOW ('X', 41, struct ocfs2_space_resv)
#define OCFS2_IOC_ALLOCSP64 _IOW ('X', 36, struct ocfs2_space_resv)
#define OCFS2_IOC_FREESP64 _IOW ('X', 37, struct ocfs2_space_resv)
#define OCFS2_IOC_RESVSP64 _IOW ('X', 42, struct ocfs2_space_resv)
#define OCFS2_IOC_UNRESVSP64 _IOW ('X', 43, struct ocfs2_space_resv)
/* Used to pass group descriptor data when online resize is done */
struct ocfs2_new_group_input {
__u64 group; /* Group descriptor's blkno. */
__u32 clusters; /* Total number of clusters in this group */
__u32 frees; /* Total free clusters in this group */
__u16 chain; /* Chain for this group */
__u16 reserved1;
__u32 reserved2;
};
#define OCFS2_IOC_GROUP_EXTEND _IOW('o', 1, int)
#define OCFS2_IOC_GROUP_ADD _IOW('o', 2,struct ocfs2_new_group_input)
#define OCFS2_IOC_GROUP_ADD64 _IOW('o', 3,struct ocfs2_new_group_input)
/* Used to pass 2 file names to reflink. */
struct reflink_arguments {
__u64 old_path;
__u64 new_path;
__u64 preserve;
};
#define OCFS2_IOC_REFLINK _IOW('o', 4, struct reflink_arguments)
#endif /* OCFS2_IOCTL_H */
......@@ -23,6 +23,8 @@
/*
* The protocol version for ocfs2 cluster locking. See dlmglue.c for
* more details.
*
* 1.0 - Initial locking version from ocfs2 1.4.
*/
#define OCFS2_LOCKING_PROTOCOL_MAJOR 1
#define OCFS2_LOCKING_PROTOCOL_MINOR 0
......
......@@ -626,7 +626,7 @@ static int ocfs2_create_refcount_tree(struct inode *inode,
rb = (struct ocfs2_refcount_block *)new_bh->b_data;
memset(rb, 0, inode->i_sb->s_blocksize);
strcpy((void *)rb, OCFS2_REFCOUNT_BLOCK_SIGNATURE);
rb->rf_suballoc_slot = cpu_to_le16(osb->slot_num);
rb->rf_suballoc_slot = cpu_to_le16(meta_ac->ac_alloc_slot);
rb->rf_suballoc_bit = cpu_to_le16(suballoc_bit_start);
rb->rf_fs_generation = cpu_to_le32(osb->fs_generation);
rb->rf_blkno = cpu_to_le64(first_blkno);
......@@ -1330,7 +1330,7 @@ static int ocfs2_expand_inline_ref_root(handle_t *handle,
memcpy(new_bh->b_data, ref_root_bh->b_data, sb->s_blocksize);
new_rb = (struct ocfs2_refcount_block *)new_bh->b_data;
new_rb->rf_suballoc_slot = cpu_to_le16(OCFS2_SB(sb)->slot_num);
new_rb->rf_suballoc_slot = cpu_to_le16(meta_ac->ac_alloc_slot);
new_rb->rf_suballoc_bit = cpu_to_le16(suballoc_bit_start);
new_rb->rf_blkno = cpu_to_le64(blkno);
new_rb->rf_cpos = cpu_to_le32(0);
......@@ -1576,7 +1576,7 @@ static int ocfs2_new_leaf_refcount_block(handle_t *handle,
new_rb = (struct ocfs2_refcount_block *)new_bh->b_data;
memset(new_rb, 0, sb->s_blocksize);
strcpy((void *)new_rb, OCFS2_REFCOUNT_BLOCK_SIGNATURE);
new_rb->rf_suballoc_slot = cpu_to_le16(OCFS2_SB(sb)->slot_num);
new_rb->rf_suballoc_slot = cpu_to_le16(meta_ac->ac_alloc_slot);
new_rb->rf_suballoc_bit = cpu_to_le16(suballoc_bit_start);
new_rb->rf_fs_generation = cpu_to_le32(OCFS2_SB(sb)->fs_generation);
new_rb->rf_blkno = cpu_to_le64(blkno);
......
......@@ -161,24 +161,23 @@ static int dlm_status_to_errno(enum dlm_status status)
static void o2dlm_lock_ast_wrapper(void *astarg)
{
BUG_ON(o2cb_stack.sp_proto == NULL);
struct ocfs2_dlm_lksb *lksb = astarg;
o2cb_stack.sp_proto->lp_lock_ast(astarg);
lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
}
static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
{
BUG_ON(o2cb_stack.sp_proto == NULL);
struct ocfs2_dlm_lksb *lksb = astarg;
o2cb_stack.sp_proto->lp_blocking_ast(astarg, level);
lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
}
static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
{
struct ocfs2_dlm_lksb *lksb = astarg;
int error = dlm_status_to_errno(status);
BUG_ON(o2cb_stack.sp_proto == NULL);
/*
* In o2dlm, you can get both the lock_ast() for the lock being
* granted and the unlock_ast() for the CANCEL failing. A
......@@ -193,16 +192,15 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
if (status == DLM_CANCELGRANT)
return;
o2cb_stack.sp_proto->lp_unlock_ast(astarg, error);
lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, error);
}
static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
void *astarg)
unsigned int namelen)
{
enum dlm_status status;
int o2dlm_mode = mode_to_o2dlm(mode);
......@@ -211,28 +209,27 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn,
status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
o2dlm_flags, name, namelen,
o2dlm_lock_ast_wrapper, astarg,
o2dlm_lock_ast_wrapper, lksb,
o2dlm_blocking_ast_wrapper);
ret = dlm_status_to_errno(status);
return ret;
}
static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *astarg)
struct ocfs2_dlm_lksb *lksb,
u32 flags)
{
enum dlm_status status;
int o2dlm_flags = flags_to_o2dlm(flags);
int ret;
status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
o2dlm_flags, o2dlm_unlock_ast_wrapper, lksb);
ret = dlm_status_to_errno(status);
return ret;
}
static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
static int o2cb_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
{
return dlm_status_to_errno(lksb->lksb_o2dlm.status);
}
......@@ -242,17 +239,17 @@ static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
* contents, it will zero out the LVB. Thus the caller can always trust
* the contents.
*/
static int o2cb_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
static int o2cb_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
{
return 1;
}
static void *o2cb_dlm_lvb(union ocfs2_dlm_lksb *lksb)
static void *o2cb_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
{
return (void *)(lksb->lksb_o2dlm.lvb);
}
static void o2cb_dump_lksb(union ocfs2_dlm_lksb *lksb)
static void o2cb_dump_lksb(struct ocfs2_dlm_lksb *lksb)
{
dlm_print_one_lock(lksb->lksb_o2dlm.lockid);
}
......@@ -280,7 +277,7 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
struct dlm_protocol_version fs_version;
BUG_ON(conn == NULL);
BUG_ON(o2cb_stack.sp_proto == NULL);
BUG_ON(conn->cc_proto == NULL);
/* for now we only have one cluster/node, make sure we see it
* in the heartbeat universe */
......
......@@ -25,7 +25,6 @@
#include <linux/reboot.h>
#include <asm/uaccess.h>
#include "ocfs2.h" /* For struct ocfs2_lock_res */
#include "stackglue.h"
#include <linux/dlm_plock.h>
......@@ -63,8 +62,8 @@
* negotiated by the client. The client negotiates based on the maximum
* version advertised in /sys/fs/ocfs2/max_locking_protocol. The major
* number from the "SETV" message must match
* ocfs2_user_plugin.sp_proto->lp_max_version.pv_major, and the minor number
* must be less than or equal to ...->lp_max_version.pv_minor.
* ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
* must be less than or equal to ...sp_max_version.pv_minor.
*
* Once this information has been set, mounts will be allowed. From this
* point on, the "DOWN" message can be sent for node down notification.
......@@ -401,7 +400,7 @@ static int ocfs2_control_do_setversion_msg(struct file *file,
char *ptr = NULL;
struct ocfs2_control_private *p = file->private_data;
struct ocfs2_protocol_version *max =
&ocfs2_user_plugin.sp_proto->lp_max_version;
&ocfs2_user_plugin.sp_max_proto;
if (ocfs2_control_get_handshake_state(file) !=
OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
......@@ -664,18 +663,10 @@ static void ocfs2_control_exit(void)
-rc);
}
static struct dlm_lksb *fsdlm_astarg_to_lksb(void *astarg)
{
struct ocfs2_lock_res *res = astarg;
return &res->l_lksb.lksb_fsdlm;
}
static void fsdlm_lock_ast_wrapper(void *astarg)
{
struct dlm_lksb *lksb = fsdlm_astarg_to_lksb(astarg);
int status = lksb->sb_status;
BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
struct ocfs2_dlm_lksb *lksb = astarg;
int status = lksb->lksb_fsdlm.sb_status;
/*
* For now we're punting on the issue of other non-standard errors
......@@ -688,25 +679,24 @@ static void fsdlm_lock_ast_wrapper(void *astarg)
*/
if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
ocfs2_user_plugin.sp_proto->lp_unlock_ast(astarg, 0);
lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
else
ocfs2_user_plugin.sp_proto->lp_lock_ast(astarg);
lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
}
static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
{
BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
struct ocfs2_dlm_lksb *lksb = astarg;
ocfs2_user_plugin.sp_proto->lp_blocking_ast(astarg, level);
lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
}
static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
void *astarg)
unsigned int namelen)
{
int ret;
......@@ -716,36 +706,35 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
flags|DLM_LKF_NODLCKWT, name, namelen, 0,
fsdlm_lock_ast_wrapper, astarg,
fsdlm_lock_ast_wrapper, lksb,
fsdlm_blocking_ast_wrapper);
return ret;
}
static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *astarg)
struct ocfs2_dlm_lksb *lksb,
u32 flags)
{
int ret;
ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
flags, &lksb->lksb_fsdlm, astarg);
flags, &lksb->lksb_fsdlm, lksb);
return ret;
}
static int user_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
{
return lksb->lksb_fsdlm.sb_status;
}
static int user_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
{
int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
return !invalid;
}
static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
{
if (!lksb->lksb_fsdlm.sb_lvbptr)
lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
......@@ -753,7 +742,7 @@ static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
}
static void user_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
{
}
......
......@@ -36,7 +36,7 @@
#define OCFS2_STACK_PLUGIN_USER "user"
#define OCFS2_MAX_HB_CTL_PATH 256
static struct ocfs2_locking_protocol *lproto;
static struct ocfs2_protocol_version locking_max_version;
static DEFINE_SPINLOCK(ocfs2_stack_lock);
static LIST_HEAD(ocfs2_stack_list);
static char cluster_stack_name[OCFS2_STACK_LABEL_LEN + 1];
......@@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin)
spin_lock(&ocfs2_stack_lock);
if (!ocfs2_stack_lookup(plugin->sp_name)) {
plugin->sp_count = 0;
plugin->sp_proto = lproto;
plugin->sp_max_proto = locking_max_version;
list_add(&plugin->sp_list, &ocfs2_stack_list);
printk(KERN_INFO "ocfs2: Registered cluster interface %s\n",
plugin->sp_name);
......@@ -213,77 +213,76 @@ void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin)
}
EXPORT_SYMBOL_GPL(ocfs2_stack_glue_unregister);
void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto)
void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto)
{
struct ocfs2_stack_plugin *p;
BUG_ON(proto == NULL);
spin_lock(&ocfs2_stack_lock);
BUG_ON(active_stack != NULL);
if (memcmp(max_proto, &locking_max_version,
sizeof(struct ocfs2_protocol_version))) {
BUG_ON(locking_max_version.pv_major != 0);
lproto = proto;
list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
p->sp_proto = lproto;
locking_max_version = *max_proto;
list_for_each_entry(p, &ocfs2_stack_list, sp_list) {
p->sp_max_proto = locking_max_version;
}
}
spin_unlock(&ocfs2_stack_lock);
}
EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol);
EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_max_proto_version);
/*
* The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take
* "struct ocfs2_lock_res *astarg" instead of "void *astarg" because the
* underlying stack plugins need to pilfer the lksb off of the lock_res.
* If some other structure needs to be passed as an astarg, the plugins
* will need to be given a different avenue to the lksb.
* The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take no argument
* for the ast and bast functions. They will pass the lksb to the ast
* and bast. The caller can wrap the lksb with their own structure to
* get more information.
*/
int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
struct ocfs2_lock_res *astarg)
unsigned int namelen)
{
BUG_ON(lproto == NULL);
if (!lksb->lksb_conn)
lksb->lksb_conn = conn;
else
BUG_ON(lksb->lksb_conn != conn);
return active_stack->sp_ops->dlm_lock(conn, mode, lksb, flags,
name, namelen, astarg);
name, namelen);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lock);
int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
struct ocfs2_lock_res *astarg)
struct ocfs2_dlm_lksb *lksb,
u32 flags)
{
BUG_ON(lproto == NULL);
BUG_ON(lksb->lksb_conn == NULL);
return active_stack->sp_ops->dlm_unlock(conn, lksb, flags, astarg);
return active_stack->sp_ops->dlm_unlock(conn, lksb, flags);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_unlock);
int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
int ocfs2_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
{
return active_stack->sp_ops->lock_status(lksb);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lock_status);
int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb)
int ocfs2_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
{
return active_stack->sp_ops->lvb_valid(lksb);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb_valid);
void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
void *ocfs2_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
{
return active_stack->sp_ops->lock_lvb(lksb);
}
EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb);
void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
void ocfs2_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
{
active_stack->sp_ops->dump_lksb(lksb);
}
......@@ -312,6 +311,7 @@ EXPORT_SYMBOL_GPL(ocfs2_plock);
int ocfs2_cluster_connect(const char *stack_name,
const char *group,
int grouplen,
struct ocfs2_locking_protocol *lproto,
void (*recovery_handler)(int node_num,
void *recovery_data),
void *recovery_data,
......@@ -329,6 +329,12 @@ int ocfs2_cluster_connect(const char *stack_name,
goto out;
}
if (memcmp(&lproto->lp_max_version, &locking_max_version,
sizeof(struct ocfs2_protocol_version))) {
rc = -EINVAL;
goto out;
}
new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
GFP_KERNEL);
if (!new_conn) {
......@@ -341,6 +347,7 @@ int ocfs2_cluster_connect(const char *stack_name,
new_conn->cc_recovery_handler = recovery_handler;
new_conn->cc_recovery_data = recovery_data;
new_conn->cc_proto = lproto;
/* Start the new connection at our maximum compatibility level */
new_conn->cc_version = lproto->lp_max_version;
......@@ -366,6 +373,24 @@ out:
}
EXPORT_SYMBOL_GPL(ocfs2_cluster_connect);
/* The caller will ensure all nodes have the same cluster stack */
int ocfs2_cluster_connect_agnostic(const char *group,
int grouplen,
struct ocfs2_locking_protocol *lproto,
void (*recovery_handler)(int node_num,
void *recovery_data),
void *recovery_data,
struct ocfs2_cluster_connection **conn)
{
char *stack_name = NULL;
if (cluster_stack_name[0])
stack_name = cluster_stack_name;
return ocfs2_cluster_connect(stack_name, group, grouplen, lproto,
recovery_handler, recovery_data, conn);
}
EXPORT_SYMBOL_GPL(ocfs2_cluster_connect_agnostic);
/* If hangup_pending is 0, the stack driver will be dropped */
int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn,
int hangup_pending)
......@@ -453,10 +478,10 @@ static ssize_t ocfs2_max_locking_protocol_show(struct kobject *kobj,
ssize_t ret = 0;
spin_lock(&ocfs2_stack_lock);
if (lproto)
if (locking_max_version.pv_major)
ret = snprintf(buf, PAGE_SIZE, "%u.%u\n",
lproto->lp_max_version.pv_major,
lproto->lp_max_version.pv_minor);
locking_max_version.pv_major,
locking_max_version.pv_minor);
spin_unlock(&ocfs2_stack_lock);
return ret;
......@@ -685,7 +710,10 @@ static int __init ocfs2_stack_glue_init(void)
static void __exit ocfs2_stack_glue_exit(void)
{
lproto = NULL;
memset(&locking_max_version, 0,
sizeof(struct ocfs2_protocol_version));
locking_max_version.pv_major = 0;
locking_max_version.pv_minor = 0;
ocfs2_sysfs_exit();
if (ocfs2_table_header)
unregister_sysctl_table(ocfs2_table_header);
......
......@@ -55,17 +55,6 @@ struct ocfs2_protocol_version {
u8 pv_minor;
};
/*
* The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf.
*/
struct ocfs2_locking_protocol {
struct ocfs2_protocol_version lp_max_version;
void (*lp_lock_ast)(void *astarg);
void (*lp_blocking_ast)(void *astarg, int level);
void (*lp_unlock_ast)(void *astarg, int error);
};
/*
* The dlm_lockstatus struct includes lvb space, but the dlm_lksb struct only
* has a pointer to separately allocated lvb space. This struct exists only to
......@@ -81,12 +70,27 @@ struct fsdlm_lksb_plus_lvb {
* size of the union is known. Lock status structures are embedded in
* ocfs2 inodes.
*/
union ocfs2_dlm_lksb {
struct dlm_lockstatus lksb_o2dlm;
struct dlm_lksb lksb_fsdlm;
struct fsdlm_lksb_plus_lvb padding;
struct ocfs2_cluster_connection;
struct ocfs2_dlm_lksb {
union {
struct dlm_lockstatus lksb_o2dlm;
struct dlm_lksb lksb_fsdlm;
struct fsdlm_lksb_plus_lvb padding;
};
struct ocfs2_cluster_connection *lksb_conn;
};
/*
* The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf.
*/
struct ocfs2_locking_protocol {
struct ocfs2_protocol_version lp_max_version;
void (*lp_lock_ast)(struct ocfs2_dlm_lksb *lksb);
void (*lp_blocking_ast)(struct ocfs2_dlm_lksb *lksb, int level);
void (*lp_unlock_ast)(struct ocfs2_dlm_lksb *lksb, int error);
};
/*
* A cluster connection. Mostly opaque to ocfs2, the connection holds
* state for the underlying stack. ocfs2 does use cc_version to determine
......@@ -96,6 +100,7 @@ struct ocfs2_cluster_connection {
char cc_name[GROUP_NAME_MAX];
int cc_namelen;
struct ocfs2_protocol_version cc_version;
struct ocfs2_locking_protocol *cc_proto;
void (*cc_recovery_handler)(int node_num, void *recovery_data);
void *cc_recovery_data;
void *cc_lockspace;
......@@ -155,27 +160,29 @@ struct ocfs2_stack_operations {
*
* ast and bast functions are not part of the call because the
* stack will likely want to wrap ast and bast calls before passing
* them to stack->sp_proto.
* them to stack->sp_proto. There is no astarg. The lksb will
* be passed back to the ast and bast functions. The caller can
* use this to find their object.
*/
int (*dlm_lock)(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
void *astarg);
unsigned int namelen);
/*
* Call the underlying dlm unlock function. The ->dlm_unlock()
* function should convert the flags as appropriate.
*
* The unlock ast is not passed, as the stack will want to wrap
* it before calling stack->sp_proto->lp_unlock_ast().
* it before calling stack->sp_proto->lp_unlock_ast(). There is
* no astarg. The lksb will be passed back to the unlock ast
* function. The caller can use this to find their object.
*/
int (*dlm_unlock)(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
void *astarg);
struct ocfs2_dlm_lksb *lksb,
u32 flags);
/*
* Return the status of the current lock status block. The fs
......@@ -183,17 +190,17 @@ struct ocfs2_stack_operations {
* callback pulls out the stack-specific lksb, converts the status
* to a proper errno, and returns it.
*/
int (*lock_status)(union ocfs2_dlm_lksb *lksb);
int (*lock_status)(struct ocfs2_dlm_lksb *lksb);
/*
* Return non-zero if the LVB is valid.
*/
int (*lvb_valid)(union ocfs2_dlm_lksb *lksb);
int (*lvb_valid)(struct ocfs2_dlm_lksb *lksb);
/*
* Pull the lvb pointer off of the stack-specific lksb.
*/
void *(*lock_lvb)(union ocfs2_dlm_lksb *lksb);
void *(*lock_lvb)(struct ocfs2_dlm_lksb *lksb);
/*
* Cluster-aware posix locks
......@@ -210,7 +217,7 @@ struct ocfs2_stack_operations {
* This is an optoinal debugging hook. If provided, the
* stack can dump debugging information about this lock.
*/
void (*dump_lksb)(union ocfs2_dlm_lksb *lksb);
void (*dump_lksb)(struct ocfs2_dlm_lksb *lksb);
};
/*
......@@ -226,7 +233,7 @@ struct ocfs2_stack_plugin {
/* These are managed by the stackglue code. */
struct list_head sp_list;
unsigned int sp_count;
struct ocfs2_locking_protocol *sp_proto;
struct ocfs2_protocol_version sp_max_proto;
};
......@@ -234,10 +241,22 @@ struct ocfs2_stack_plugin {
int ocfs2_cluster_connect(const char *stack_name,
const char *group,
int grouplen,
struct ocfs2_locking_protocol *lproto,
void (*recovery_handler)(int node_num,
void *recovery_data),
void *recovery_data,
struct ocfs2_cluster_connection **conn);
/*
* Used by callers that don't store their stack name. They must ensure
* all nodes have the same stack.
*/
int ocfs2_cluster_connect_agnostic(const char *group,
int grouplen,
struct ocfs2_locking_protocol *lproto,
void (*recovery_handler)(int node_num,
void *recovery_data),
void *recovery_data,
struct ocfs2_cluster_connection **conn);
int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn,
int hangup_pending);
void ocfs2_cluster_hangup(const char *group, int grouplen);
......@@ -246,26 +265,24 @@ int ocfs2_cluster_this_node(unsigned int *node);
struct ocfs2_lock_res;
int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
int mode,
union ocfs2_dlm_lksb *lksb,
struct ocfs2_dlm_lksb *lksb,
u32 flags,
void *name,
unsigned int namelen,
struct ocfs2_lock_res *astarg);
unsigned int namelen);
int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
union ocfs2_dlm_lksb *lksb,
u32 flags,
struct ocfs2_lock_res *astarg);
struct ocfs2_dlm_lksb *lksb,
u32 flags);
int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb);
int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb);
void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb);
void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb);
int ocfs2_dlm_lock_status(struct ocfs2_dlm_lksb *lksb);
int ocfs2_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb);
void *ocfs2_dlm_lvb(struct ocfs2_dlm_lksb *lksb);
void ocfs2_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb);
int ocfs2_stack_supports_plocks(void);
int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino,
struct file *file, int cmd, struct file_lock *fl);
void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto);
void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto);
/* Used by stack plugins */
......
......@@ -51,7 +51,7 @@
#define ALLOC_NEW_GROUP 0x1
#define ALLOC_GROUPS_FROM_GLOBAL 0x2
#define OCFS2_MAX_INODES_TO_STEAL 1024
#define OCFS2_MAX_TO_STEAL 1024
static inline void ocfs2_debug_bg(struct ocfs2_group_desc *bg);
static inline void ocfs2_debug_suballoc_inode(struct ocfs2_dinode *fe);
......@@ -637,12 +637,113 @@ bail:
return status;
}
static void ocfs2_init_inode_steal_slot(struct ocfs2_super *osb)
{
spin_lock(&osb->osb_lock);
osb->s_inode_steal_slot = OCFS2_INVALID_SLOT;
spin_unlock(&osb->osb_lock);
atomic_set(&osb->s_num_inodes_stolen, 0);
}
static void ocfs2_init_meta_steal_slot(struct ocfs2_super *osb)
{
spin_lock(&osb->osb_lock);
osb->s_meta_steal_slot = OCFS2_INVALID_SLOT;
spin_unlock(&osb->osb_lock);
atomic_set(&osb->s_num_meta_stolen, 0);
}
void ocfs2_init_steal_slots(struct ocfs2_super *osb)
{
ocfs2_init_inode_steal_slot(osb);
ocfs2_init_meta_steal_slot(osb);
}
static void __ocfs2_set_steal_slot(struct ocfs2_super *osb, int slot, int type)
{
spin_lock(&osb->osb_lock);
if (type == INODE_ALLOC_SYSTEM_INODE)
osb->s_inode_steal_slot = slot;
else if (type == EXTENT_ALLOC_SYSTEM_INODE)
osb->s_meta_steal_slot = slot;
spin_unlock(&osb->osb_lock);
}
static int __ocfs2_get_steal_slot(struct ocfs2_super *osb, int type)
{
int slot = OCFS2_INVALID_SLOT;
spin_lock(&osb->osb_lock);
if (type == INODE_ALLOC_SYSTEM_INODE)
slot = osb->s_inode_steal_slot;
else if (type == EXTENT_ALLOC_SYSTEM_INODE)
slot = osb->s_meta_steal_slot;
spin_unlock(&osb->osb_lock);
return slot;
}
static int ocfs2_get_inode_steal_slot(struct ocfs2_super *osb)
{
return __ocfs2_get_steal_slot(osb, INODE_ALLOC_SYSTEM_INODE);
}
static int ocfs2_get_meta_steal_slot(struct ocfs2_super *osb)
{
return __ocfs2_get_steal_slot(osb, EXTENT_ALLOC_SYSTEM_INODE);
}
static int ocfs2_steal_resource(struct ocfs2_super *osb,
struct ocfs2_alloc_context *ac,
int type)
{
int i, status = -ENOSPC;
int slot = __ocfs2_get_steal_slot(osb, type);
/* Start to steal resource from the first slot after ours. */
if (slot == OCFS2_INVALID_SLOT)
slot = osb->slot_num + 1;
for (i = 0; i < osb->max_slots; i++, slot++) {
if (slot == osb->max_slots)
slot = 0;
if (slot == osb->slot_num)
continue;
status = ocfs2_reserve_suballoc_bits(osb, ac,
type,
(u32)slot, NULL,
NOT_ALLOC_NEW_GROUP);
if (status >= 0) {
__ocfs2_set_steal_slot(osb, slot, type);
break;
}
ocfs2_free_ac_resource(ac);
}
return status;
}
static int ocfs2_steal_inode(struct ocfs2_super *osb,
struct ocfs2_alloc_context *ac)
{
return ocfs2_steal_resource(osb, ac, INODE_ALLOC_SYSTEM_INODE);
}
static int ocfs2_steal_meta(struct ocfs2_super *osb,
struct ocfs2_alloc_context *ac)
{
return ocfs2_steal_resource(osb, ac, EXTENT_ALLOC_SYSTEM_INODE);
}
int ocfs2_reserve_new_metadata_blocks(struct ocfs2_super *osb,
int blocks,
struct ocfs2_alloc_context **ac)
{
int status;
u32 slot;
int slot = ocfs2_get_meta_steal_slot(osb);
*ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
if (!(*ac)) {
......@@ -653,12 +754,34 @@ int ocfs2_reserve_new_metadata_blocks(struct ocfs2_super *osb,
(*ac)->ac_bits_wanted = blocks;
(*ac)->ac_which = OCFS2_AC_USE_META;
slot = osb->slot_num;
(*ac)->ac_group_search = ocfs2_block_group_search;
if (slot != OCFS2_INVALID_SLOT &&
atomic_read(&osb->s_num_meta_stolen) < OCFS2_MAX_TO_STEAL)
goto extent_steal;
atomic_set(&osb->s_num_meta_stolen, 0);
status = ocfs2_reserve_suballoc_bits(osb, (*ac),
EXTENT_ALLOC_SYSTEM_INODE,
slot, NULL, ALLOC_NEW_GROUP);
(u32)osb->slot_num, NULL,
ALLOC_NEW_GROUP);
if (status >= 0) {
status = 0;
if (slot != OCFS2_INVALID_SLOT)
ocfs2_init_meta_steal_slot(osb);
goto bail;
} else if (status < 0 && status != -ENOSPC) {
mlog_errno(status);
goto bail;
}
ocfs2_free_ac_resource(*ac);
extent_steal:
status = ocfs2_steal_meta(osb, *ac);
atomic_inc(&osb->s_num_meta_stolen);
if (status < 0) {
if (status != -ENOSPC)
mlog_errno(status);
......@@ -685,43 +808,11 @@ int ocfs2_reserve_new_metadata(struct ocfs2_super *osb,
ac);
}
static int ocfs2_steal_inode_from_other_nodes(struct ocfs2_super *osb,
struct ocfs2_alloc_context *ac)
{
int i, status = -ENOSPC;
s16 slot = ocfs2_get_inode_steal_slot(osb);
/* Start to steal inodes from the first slot after ours. */
if (slot == OCFS2_INVALID_SLOT)
slot = osb->slot_num + 1;
for (i = 0; i < osb->max_slots; i++, slot++) {
if (slot == osb->max_slots)
slot = 0;
if (slot == osb->slot_num)
continue;
status = ocfs2_reserve_suballoc_bits(osb, ac,
INODE_ALLOC_SYSTEM_INODE,
slot, NULL,
NOT_ALLOC_NEW_GROUP);
if (status >= 0) {
ocfs2_set_inode_steal_slot(osb, slot);
break;
}
ocfs2_free_ac_resource(ac);
}
return status;
}
int ocfs2_reserve_new_inode(struct ocfs2_super *osb,
struct ocfs2_alloc_context **ac)
{
int status;
s16 slot = ocfs2_get_inode_steal_slot(osb);
int slot = ocfs2_get_inode_steal_slot(osb);
u64 alloc_group;
*ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
......@@ -754,14 +845,14 @@ int ocfs2_reserve_new_inode(struct ocfs2_super *osb,
* need to check our slots to see whether there is some space for us.
*/
if (slot != OCFS2_INVALID_SLOT &&
atomic_read(&osb->s_num_inodes_stolen) < OCFS2_MAX_INODES_TO_STEAL)
atomic_read(&osb->s_num_inodes_stolen) < OCFS2_MAX_TO_STEAL)
goto inode_steal;
atomic_set(&osb->s_num_inodes_stolen, 0);
alloc_group = osb->osb_inode_alloc_group;
status = ocfs2_reserve_suballoc_bits(osb, *ac,
INODE_ALLOC_SYSTEM_INODE,
osb->slot_num,
(u32)osb->slot_num,
&alloc_group,
ALLOC_NEW_GROUP |
ALLOC_GROUPS_FROM_GLOBAL);
......@@ -789,7 +880,7 @@ int ocfs2_reserve_new_inode(struct ocfs2_super *osb,
ocfs2_free_ac_resource(*ac);
inode_steal:
status = ocfs2_steal_inode_from_other_nodes(osb, *ac);
status = ocfs2_steal_inode(osb, *ac);
atomic_inc(&osb->s_num_inodes_stolen);
if (status < 0) {
if (status != -ENOSPC)
......
......@@ -56,6 +56,7 @@ struct ocfs2_alloc_context {
is the same as ~0 - unlimited */
};
void ocfs2_init_steal_slots(struct ocfs2_super *osb);
void ocfs2_free_alloc_context(struct ocfs2_alloc_context *ac);
static inline int ocfs2_alloc_context_bits_left(struct ocfs2_alloc_context *ac)
{
......
......@@ -69,6 +69,7 @@
#include "xattr.h"
#include "quota.h"
#include "refcounttree.h"
#include "suballoc.h"
#include "buffer_head_io.h"
......@@ -301,9 +302,12 @@ static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
spin_lock(&osb->osb_lock);
out += snprintf(buf + out, len - out,
"%10s => Slot: %d NumStolen: %d\n", "Steal",
"%10s => InodeSlot: %d StolenInodes: %d, "
"MetaSlot: %d StolenMeta: %d\n", "Steal",
osb->s_inode_steal_slot,
atomic_read(&osb->s_num_inodes_stolen));
atomic_read(&osb->s_num_inodes_stolen),
osb->s_meta_steal_slot,
atomic_read(&osb->s_num_meta_stolen));
spin_unlock(&osb->osb_lock);
out += snprintf(buf + out, len - out, "OrphanScan => ");
......@@ -1997,7 +2001,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
osb->blocked_lock_count = 0;
spin_lock_init(&osb->osb_lock);
spin_lock_init(&osb->osb_xattr_lock);
ocfs2_init_inode_steal_slot(osb);
ocfs2_init_steal_slots(osb);
atomic_set(&osb->alloc_stats.moves, 0);
atomic_set(&osb->alloc_stats.local_data, 0);
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册