提交 ea2c9c9f 编写于 作者: A Al Viro 提交者: Mike Marshall

orangefs: bufmap rewrite

new waiting-for-slot logics:
	* make request for slot wait for bufmap to be set up if it
comes before it's installed *OR* while it's running down
	* make closing control device wait for all slots to be freed
	* waiting itself rewritten to (open-coded) analogues of wait_event_...
primitives - we would need wait_event_locked() and, pardon an obscenely
long name, wait_event_interruptible_exclusive_timeout_locked().
	* we never wait for more than slot_timeout_secs in total and,
if during the wait the daemon goes away, we only allow
ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS for it to come back.
	* (cosmetical) bitmap is used instead of an array of zeroes and ones
	* old (and only reached if we are about to corrupt memory) waiting
for daemon restart in service_operation() removed.

[Martin's fixes folded]
Signed-off-by: NAl Viro <viro@zeniv.linux.org.uk>
Signed-off-by: NMike Marshall <hubcap@omnibond.com>
上级 17804184
......@@ -508,8 +508,7 @@ static int orangefs_devreq_release(struct inode *inode, struct file *file)
__func__);
mutex_lock(&devreq_mutex);
if (orangefs_get_bufmap_init())
orangefs_bufmap_finalize();
orangefs_bufmap_finalize();
open_access_count = -1;
......@@ -527,6 +526,9 @@ static int orangefs_devreq_release(struct inode *inode, struct file *file)
* them as purged and wake them up
*/
purge_inprogress_ops();
orangefs_bufmap_run_down();
gossip_debug(GOSSIP_DEV_DEBUG,
"pvfs2-client-core: device close complete\n");
open_access_count = 0;
......@@ -607,13 +609,8 @@ static long dispatch_ioctl_command(unsigned int command, unsigned long arg)
(struct ORANGEFS_dev_map_desc __user *)
arg,
sizeof(struct ORANGEFS_dev_map_desc));
if (orangefs_get_bufmap_init()) {
return -EINVAL;
} else {
return ret ?
-EIO :
orangefs_bufmap_initialize(&user_desc);
}
/* WTF -EIO and not -EFAULT? */
return ret ? -EIO : orangefs_bufmap_initialize(&user_desc);
case ORANGEFS_DEV_REMOUNT_ALL:
gossip_debug(GOSSIP_DEV_DEBUG,
"%s: got ORANGEFS_DEV_REMOUNT_ALL\n",
......
......@@ -7,7 +7,133 @@
#include "orangefs-kernel.h"
#include "orangefs-bufmap.h"
DECLARE_WAIT_QUEUE_HEAD(orangefs_bufmap_init_waitq);
struct slot_map {
int c;
wait_queue_head_t q;
int count;
unsigned long *map;
};
static struct slot_map rw_map = {
.c = -1,
.q = __WAIT_QUEUE_HEAD_INITIALIZER(rw_map.q)
};
static struct slot_map readdir_map = {
.c = -1,
.q = __WAIT_QUEUE_HEAD_INITIALIZER(readdir_map.q)
};
static void install(struct slot_map *m, int count, unsigned long *map)
{
spin_lock(&m->q.lock);
m->c = m->count = count;
m->map = map;
wake_up_all_locked(&m->q);
spin_unlock(&m->q.lock);
}
static void mark_killed(struct slot_map *m)
{
spin_lock(&m->q.lock);
m->c -= m->count + 1;
spin_unlock(&m->q.lock);
}
static void run_down(struct slot_map *m)
{
DEFINE_WAIT(wait);
spin_lock(&m->q.lock);
if (m->c != -1) {
for (;;) {
if (likely(list_empty(&wait.task_list)))
__add_wait_queue_tail(&m->q, &wait);
set_current_state(TASK_UNINTERRUPTIBLE);
if (m->c == -1)
break;
spin_unlock(&m->q.lock);
schedule();
spin_lock(&m->q.lock);
}
__remove_wait_queue(&m->q, &wait);
__set_current_state(TASK_RUNNING);
}
m->map = NULL;
spin_unlock(&m->q.lock);
}
static void put(struct slot_map *m, int slot)
{
int v;
spin_lock(&m->q.lock);
__clear_bit(slot, m->map);
v = ++m->c;
if (unlikely(v == 1)) /* no free slots -> one free slot */
wake_up_locked(&m->q);
else if (unlikely(v == -1)) /* finished dying */
wake_up_all_locked(&m->q);
spin_unlock(&m->q.lock);
}
static int wait_for_free(struct slot_map *m)
{
long left = slot_timeout_secs * HZ;
DEFINE_WAIT(wait);
do {
long n = left, t;
if (likely(list_empty(&wait.task_list)))
__add_wait_queue_tail_exclusive(&m->q, &wait);
set_current_state(TASK_INTERRUPTIBLE);
if (m->c > 0)
break;
if (m->c < 0) {
/* we are waiting for map to be installed */
/* it would better be there soon, or we go away */
if (n > ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ)
n = ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ;
}
spin_unlock(&m->q.lock);
t = schedule_timeout(n);
spin_lock(&m->q.lock);
if (unlikely(!t) && n != left && m->c < 0)
left = t;
else
left = t + (left - n);
if (unlikely(signal_pending(current)))
left = -EINTR;
} while (left > 0);
if (!list_empty(&wait.task_list))
list_del(&wait.task_list);
else if (left <= 0 && waitqueue_active(&m->q))
__wake_up_locked_key(&m->q, TASK_INTERRUPTIBLE, NULL);
__set_current_state(TASK_RUNNING);
if (likely(left > 0))
return 0;
return left < 0 ? -EINTR : -ETIMEDOUT;
}
static int get(struct slot_map *m)
{
int res = 0;
spin_lock(&m->q.lock);
if (unlikely(m->c <= 0))
res = wait_for_free(m);
if (likely(!res)) {
m->c--;
res = find_first_zero_bit(m->map, m->count);
__set_bit(res, m->map);
}
spin_unlock(&m->q.lock);
return res;
}
/* used to describe mapped buffers */
struct orangefs_bufmap_desc {
......@@ -18,8 +144,6 @@ struct orangefs_bufmap_desc {
};
static struct orangefs_bufmap {
atomic_t refcnt;
int desc_size;
int desc_shift;
int desc_count;
......@@ -30,12 +154,12 @@ static struct orangefs_bufmap {
struct orangefs_bufmap_desc *desc_array;
/* array to track usage of buffer descriptors */
int *buffer_index_array;
spinlock_t buffer_index_lock;
unsigned long *buffer_index_array;
/* array to track usage of buffer descriptors for readdir */
int readdir_index_array[ORANGEFS_READDIR_DEFAULT_DESC_COUNT];
spinlock_t readdir_index_lock;
#define N DIV_ROUND_UP(ORANGEFS_READDIR_DEFAULT_DESC_COUNT, BITS_PER_LONG)
unsigned long readdir_index_array[N];
#undef N
} *__orangefs_bufmap;
static DEFINE_SPINLOCK(orangefs_bufmap_lock);
......@@ -58,30 +182,6 @@ orangefs_bufmap_free(struct orangefs_bufmap *bufmap)
kfree(bufmap);
}
static struct orangefs_bufmap *orangefs_bufmap_ref(void)
{
struct orangefs_bufmap *bufmap = NULL;
spin_lock(&orangefs_bufmap_lock);
if (__orangefs_bufmap) {
bufmap = __orangefs_bufmap;
atomic_inc(&bufmap->refcnt);
}
spin_unlock(&orangefs_bufmap_lock);
return bufmap;
}
static void orangefs_bufmap_unref(struct orangefs_bufmap *bufmap)
{
if (atomic_dec_and_lock(&bufmap->refcnt, &orangefs_bufmap_lock)) {
__orangefs_bufmap = NULL;
spin_unlock(&orangefs_bufmap_lock);
orangefs_bufmap_unmap(bufmap);
orangefs_bufmap_free(bufmap);
}
}
/*
* XXX: Can the size and shift change while the caller gives up the
* XXX: lock between calling this and doing something useful?
......@@ -137,21 +237,18 @@ orangefs_bufmap_alloc(struct ORANGEFS_dev_map_desc *user_desc)
if (!bufmap)
goto out;
atomic_set(&bufmap->refcnt, 1);
bufmap->total_size = user_desc->total_size;
bufmap->desc_count = user_desc->count;
bufmap->desc_size = user_desc->size;
bufmap->desc_shift = ilog2(bufmap->desc_size);
spin_lock_init(&bufmap->buffer_index_lock);
bufmap->buffer_index_array =
kcalloc(bufmap->desc_count, sizeof(int), GFP_KERNEL);
kzalloc(DIV_ROUND_UP(bufmap->desc_count, BITS_PER_LONG), GFP_KERNEL);
if (!bufmap->buffer_index_array) {
gossip_err("orangefs: could not allocate %d buffer indices\n",
bufmap->desc_count);
goto out_free_bufmap;
}
spin_lock_init(&bufmap->readdir_index_lock);
bufmap->desc_array =
kcalloc(bufmap->desc_count, sizeof(struct orangefs_bufmap_desc),
......@@ -294,24 +391,18 @@ int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc)
if (__orangefs_bufmap) {
spin_unlock(&orangefs_bufmap_lock);
gossip_err("orangefs: error: bufmap already initialized.\n");
ret = -EALREADY;
ret = -EINVAL;
goto out_unmap_bufmap;
}
__orangefs_bufmap = bufmap;
install(&rw_map,
bufmap->desc_count,
bufmap->buffer_index_array);
install(&readdir_map,
ORANGEFS_READDIR_DEFAULT_DESC_COUNT,
bufmap->readdir_index_array);
spin_unlock(&orangefs_bufmap_lock);
/*
* If there are operations in orangefs_bufmap_init_waitq, wake them up.
* This scenario occurs when the client-core is restarted and I/O
* requests in the in-progress or waiting tables are restarted. I/O
* requests cannot be restarted until the shared memory system is
* completely re-initialized, so we put the I/O requests in this
* waitq until initialization has completed. NOTE: the I/O requests
* are also on a timer, so they don't wait forever just in case the
* client-core doesn't come back up.
*/
wake_up_interruptible(&orangefs_bufmap_init_waitq);
gossip_debug(GOSSIP_BUFMAP_DEBUG,
"orangefs_bufmap_initialize: exiting normally\n");
return 0;
......@@ -334,91 +425,28 @@ int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc)
*/
void orangefs_bufmap_finalize(void)
{
struct orangefs_bufmap *bufmap = __orangefs_bufmap;
if (!bufmap)
return;
gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_finalize: called\n");
BUG_ON(!__orangefs_bufmap);
orangefs_bufmap_unref(__orangefs_bufmap);
mark_killed(&rw_map);
mark_killed(&readdir_map);
gossip_debug(GOSSIP_BUFMAP_DEBUG,
"orangefs_bufmap_finalize: exiting normally\n");
}
struct slot_args {
int slot_count;
int *slot_array;
spinlock_t *slot_lock;
wait_queue_head_t *slot_wq;
};
static int wait_for_a_slot(struct slot_args *slargs, int *buffer_index)
void orangefs_bufmap_run_down(void)
{
int ret = -1;
int i = 0;
DEFINE_WAIT(wait_entry);
while (1) {
/*
* check for available desc, slot_lock is the appropriate
* index_lock
*/
spin_lock(slargs->slot_lock);
prepare_to_wait_exclusive(slargs->slot_wq,
&wait_entry,
TASK_INTERRUPTIBLE);
for (i = 0; i < slargs->slot_count; i++)
if (slargs->slot_array[i] == 0) {
slargs->slot_array[i] = 1;
*buffer_index = i;
ret = 0;
break;
}
spin_unlock(slargs->slot_lock);
/* if we acquired a buffer, then break out of while */
if (ret == 0)
break;
if (!signal_pending(current)) {
gossip_debug(GOSSIP_BUFMAP_DEBUG,
"[BUFMAP]: waiting %d "
"seconds for a slot\n",
slot_timeout_secs);
if (!schedule_timeout(slot_timeout_secs * HZ)) {
gossip_debug(GOSSIP_BUFMAP_DEBUG,
"*** wait_for_a_slot timed out\n");
ret = -ETIMEDOUT;
break;
}
gossip_debug(GOSSIP_BUFMAP_DEBUG,
"[BUFMAP]: woken up by a slot becoming available.\n");
continue;
}
gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs: %s interrupted.\n",
__func__);
ret = -EINTR;
break;
}
spin_lock(slargs->slot_lock);
finish_wait(slargs->slot_wq, &wait_entry);
spin_unlock(slargs->slot_lock);
return ret;
}
static void put_back_slot(struct slot_args *slargs, int buffer_index)
{
/* slot_lock is the appropriate index_lock */
spin_lock(slargs->slot_lock);
if (buffer_index < 0 || buffer_index >= slargs->slot_count) {
spin_unlock(slargs->slot_lock);
struct orangefs_bufmap *bufmap = __orangefs_bufmap;
if (!bufmap)
return;
}
/* put the desc back on the queue */
slargs->slot_array[buffer_index] = 0;
spin_unlock(slargs->slot_lock);
/* wake up anyone who may be sleeping on the queue */
wake_up_interruptible(slargs->slot_wq);
run_down(&rw_map);
run_down(&readdir_map);
spin_lock(&orangefs_bufmap_lock);
__orangefs_bufmap = NULL;
spin_unlock(&orangefs_bufmap_lock);
orangefs_bufmap_unmap(bufmap);
orangefs_bufmap_free(bufmap);
}
/*
......@@ -431,23 +459,12 @@ static void put_back_slot(struct slot_args *slargs, int buffer_index)
*/
int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index)
{
struct orangefs_bufmap *bufmap = orangefs_bufmap_ref();
struct slot_args slargs;
int ret;
if (!bufmap) {
gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n");
return -EIO;
int ret = get(&rw_map);
if (ret >= 0) {
*mapp = __orangefs_bufmap;
*buffer_index = ret;
ret = 0;
}
slargs.slot_count = bufmap->desc_count;
slargs.slot_array = bufmap->buffer_index_array;
slargs.slot_lock = &bufmap->buffer_index_lock;
slargs.slot_wq = &bufmap_waitq;
ret = wait_for_a_slot(&slargs, buffer_index);
if (ret)
orangefs_bufmap_unref(bufmap);
*mapp = bufmap;
return ret;
}
......@@ -460,15 +477,7 @@ int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index)
*/
void orangefs_bufmap_put(int buffer_index)
{
struct slot_args slargs;
struct orangefs_bufmap *bufmap = __orangefs_bufmap;
slargs.slot_count = bufmap->desc_count;
slargs.slot_array = bufmap->buffer_index_array;
slargs.slot_lock = &bufmap->buffer_index_lock;
slargs.slot_wq = &bufmap_waitq;
put_back_slot(&slargs, buffer_index);
orangefs_bufmap_unref(bufmap);
put(&rw_map, buffer_index);
}
/*
......@@ -484,36 +493,18 @@ void orangefs_bufmap_put(int buffer_index)
*/
int orangefs_readdir_index_get(struct orangefs_bufmap **mapp, int *buffer_index)
{
struct orangefs_bufmap *bufmap = orangefs_bufmap_ref();
struct slot_args slargs;
int ret;
if (!bufmap) {
gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n");
return -EIO;
int ret = get(&readdir_map);
if (ret >= 0) {
*mapp = __orangefs_bufmap;
*buffer_index = ret;
ret = 0;
}
slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT;
slargs.slot_array = bufmap->readdir_index_array;
slargs.slot_lock = &bufmap->readdir_index_lock;
slargs.slot_wq = &readdir_waitq;
ret = wait_for_a_slot(&slargs, buffer_index);
if (ret)
orangefs_bufmap_unref(bufmap);
*mapp = bufmap;
return ret;
}
void orangefs_readdir_index_put(struct orangefs_bufmap *bufmap, int buffer_index)
{
struct slot_args slargs;
slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT;
slargs.slot_array = bufmap->readdir_index_array;
slargs.slot_lock = &bufmap->readdir_index_lock;
slargs.slot_wq = &readdir_waitq;
put_back_slot(&slargs, buffer_index);
orangefs_bufmap_unref(bufmap);
put(&readdir_map, buffer_index);
}
/*
......
......@@ -15,10 +15,10 @@ int orangefs_bufmap_shift_query(void);
int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc);
int orangefs_get_bufmap_init(void);
void orangefs_bufmap_finalize(void);
void orangefs_bufmap_run_down(void);
int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index);
void orangefs_bufmap_put(int buffer_index);
......
......@@ -155,67 +155,6 @@ int service_operation(struct orangefs_kernel_op_s *op,
* system
*/
goto retry_servicing;
/* op uses shared memory */
if (orangefs_get_bufmap_init() == 0) {
WARN_ON(1);
/*
* This operation uses the shared memory system AND
* the system is not yet ready. This situation occurs
* when the client-core is restarted AND there were
* operations waiting to be processed or were already
* in process.
*/
gossip_debug(GOSSIP_WAIT_DEBUG,
"uses_shared_memory is true.\n");
gossip_debug(GOSSIP_WAIT_DEBUG,
"Client core in-service status(%d).\n",
is_daemon_in_service());
gossip_debug(GOSSIP_WAIT_DEBUG, "bufmap_init:%d.\n",
orangefs_get_bufmap_init());
gossip_debug(GOSSIP_WAIT_DEBUG,
"operation's status is 0x%0x.\n",
op->op_state);
/*
* let process sleep for a few seconds so shared
* memory system can be initialized.
*/
prepare_to_wait(&orangefs_bufmap_init_waitq,
&wait_entry,
TASK_INTERRUPTIBLE);
/*
* Wait for orangefs_bufmap_initialize() to wake me up
* within the allotted time.
*/
ret = schedule_timeout(
ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ);
gossip_debug(GOSSIP_WAIT_DEBUG,
"Value returned from schedule_timeout:"
"%d.\n",
ret);
gossip_debug(GOSSIP_WAIT_DEBUG,
"Is shared memory available? (%d).\n",
orangefs_get_bufmap_init());
finish_wait(&orangefs_bufmap_init_waitq, &wait_entry);
if (orangefs_get_bufmap_init() == 0) {
gossip_err("%s:The shared memory system has not started in %d seconds after the client core restarted. Aborting user's request(%s).\n",
__func__,
ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS,
get_opname_string(op));
return -EIO;
}
/*
* Return to the calling function and re-populate a
* shared memory buffer.
*/
return -EAGAIN;
}
}
out:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册