提交 548ed102 编写于 作者: L Linus Torvalds

Merge tag 'dlm-3.6' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updatesfrom David Teigland:
 "This set includes a major redesign of recording the master node for
  resources.  The old dir hash table, which just held the master node
  for each resource, has been removed.  The rsb hash table has always
  duplicated the master node value from the dir, and is now the single
  record of it.

  Having two full hash tables of all resources has always been a waste,
  especially since one just duplicated a single value from the other.
  Local requests will now often require one instead of two lengthy hash
  table searches.

  The other substantial change is made possible by the dirtbl removal,
  and fixes a long standing race between resource removal and lookup by
  reworking how removal is done.  At the same time it improves the
  efficiency of removal by avoiding repeated searches through a hash
  bucket.

  The other commits include minor fixes and changes."

* tag 'dlm-3.6' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: fix missing dir remove
  dlm: fix conversion deadlock from recovery
  dlm: use wait_event_timeout
  dlm: fix race between remove and lookup
  dlm: use idr instead of list for recovered rsbs
  dlm: use rsbtbl as resource directory
......@@ -96,7 +96,6 @@ struct dlm_cluster {
unsigned int cl_tcp_port;
unsigned int cl_buffer_size;
unsigned int cl_rsbtbl_size;
unsigned int cl_dirtbl_size;
unsigned int cl_recover_timer;
unsigned int cl_toss_secs;
unsigned int cl_scan_secs;
......@@ -113,7 +112,6 @@ enum {
CLUSTER_ATTR_TCP_PORT = 0,
CLUSTER_ATTR_BUFFER_SIZE,
CLUSTER_ATTR_RSBTBL_SIZE,
CLUSTER_ATTR_DIRTBL_SIZE,
CLUSTER_ATTR_RECOVER_TIMER,
CLUSTER_ATTR_TOSS_SECS,
CLUSTER_ATTR_SCAN_SECS,
......@@ -189,7 +187,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
CLUSTER_ATTR(tcp_port, 1);
CLUSTER_ATTR(buffer_size, 1);
CLUSTER_ATTR(rsbtbl_size, 1);
CLUSTER_ATTR(dirtbl_size, 1);
CLUSTER_ATTR(recover_timer, 1);
CLUSTER_ATTR(toss_secs, 1);
CLUSTER_ATTR(scan_secs, 1);
......@@ -204,7 +201,6 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
[CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
[CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
[CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
[CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
[CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
[CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
......@@ -478,7 +474,6 @@ static struct config_group *make_cluster(struct config_group *g,
cl->cl_tcp_port = dlm_config.ci_tcp_port;
cl->cl_buffer_size = dlm_config.ci_buffer_size;
cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
cl->cl_recover_timer = dlm_config.ci_recover_timer;
cl->cl_toss_secs = dlm_config.ci_toss_secs;
cl->cl_scan_secs = dlm_config.ci_scan_secs;
......@@ -1050,7 +1045,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_TCP_PORT 21064
#define DEFAULT_BUFFER_SIZE 4096
#define DEFAULT_RSBTBL_SIZE 1024
#define DEFAULT_DIRTBL_SIZE 1024
#define DEFAULT_RECOVER_TIMER 5
#define DEFAULT_TOSS_SECS 10
#define DEFAULT_SCAN_SECS 5
......@@ -1066,7 +1060,6 @@ struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT,
.ci_buffer_size = DEFAULT_BUFFER_SIZE,
.ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
.ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
.ci_recover_timer = DEFAULT_RECOVER_TIMER,
.ci_toss_secs = DEFAULT_TOSS_SECS,
.ci_scan_secs = DEFAULT_SCAN_SECS,
......
......@@ -27,7 +27,6 @@ struct dlm_config_info {
int ci_tcp_port;
int ci_buffer_size;
int ci_rsbtbl_size;
int ci_dirtbl_size;
int ci_recover_timer;
int ci_toss_secs;
int ci_scan_secs;
......
......@@ -344,6 +344,45 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s)
return rv;
}
static int print_format4(struct dlm_rsb *r, struct seq_file *s)
{
int our_nodeid = dlm_our_nodeid();
int print_name = 1;
int i, rv;
lock_rsb(r);
rv = seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ",
r,
r->res_nodeid,
r->res_master_nodeid,
r->res_dir_nodeid,
our_nodeid,
r->res_toss_time,
r->res_flags,
r->res_length);
if (rv)
goto out;
for (i = 0; i < r->res_length; i++) {
if (!isascii(r->res_name[i]) || !isprint(r->res_name[i]))
print_name = 0;
}
seq_printf(s, "%s", print_name ? "str " : "hex");
for (i = 0; i < r->res_length; i++) {
if (print_name)
seq_printf(s, "%c", r->res_name[i]);
else
seq_printf(s, " %02x", (unsigned char)r->res_name[i]);
}
rv = seq_printf(s, "\n");
out:
unlock_rsb(r);
return rv;
}
struct rsbtbl_iter {
struct dlm_rsb *rsb;
unsigned bucket;
......@@ -382,6 +421,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
}
rv = print_format3(ri->rsb, seq);
break;
case 4:
if (ri->header) {
seq_printf(seq, "version 4 rsb 2\n");
ri->header = 0;
}
rv = print_format4(ri->rsb, seq);
break;
}
return rv;
......@@ -390,15 +436,18 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
static const struct seq_operations format1_seq_ops;
static const struct seq_operations format2_seq_ops;
static const struct seq_operations format3_seq_ops;
static const struct seq_operations format4_seq_ops;
static void *table_seq_start(struct seq_file *seq, loff_t *pos)
{
struct rb_root *tree;
struct rb_node *node;
struct dlm_ls *ls = seq->private;
struct rsbtbl_iter *ri;
struct dlm_rsb *r;
loff_t n = *pos;
unsigned bucket, entry;
int toss = (seq->op == &format4_seq_ops);
bucket = n >> 32;
entry = n & ((1LL << 32) - 1);
......@@ -417,11 +466,14 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
ri->format = 2;
if (seq->op == &format3_seq_ops)
ri->format = 3;
if (seq->op == &format4_seq_ops)
ri->format = 4;
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
node = rb_next(node)) {
if (!RB_EMPTY_ROOT(tree)) {
for (node = rb_first(tree); node; node = rb_next(node)) {
r = rb_entry(node, struct dlm_rsb, res_hashnode);
if (!entry--) {
dlm_hold_rsb(r);
......@@ -449,10 +501,11 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
kfree(ri);
return NULL;
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
node = rb_first(&ls->ls_rsbtbl[bucket].keep);
if (!RB_EMPTY_ROOT(tree)) {
node = rb_first(tree);
r = rb_entry(node, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
......@@ -469,10 +522,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
{
struct dlm_ls *ls = seq->private;
struct rsbtbl_iter *ri = iter_ptr;
struct rb_root *tree;
struct rb_node *next;
struct dlm_rsb *r, *rp;
loff_t n = *pos;
unsigned bucket;
int toss = (seq->op == &format4_seq_ops);
bucket = n >> 32;
......@@ -511,10 +566,11 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
kfree(ri);
return NULL;
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
next = rb_first(&ls->ls_rsbtbl[bucket].keep);
if (!RB_EMPTY_ROOT(tree)) {
next = rb_first(tree);
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
......@@ -558,9 +614,17 @@ static const struct seq_operations format3_seq_ops = {
.show = table_seq_show,
};
static const struct seq_operations format4_seq_ops = {
.start = table_seq_start,
.next = table_seq_next,
.stop = table_seq_stop,
.show = table_seq_show,
};
static const struct file_operations format1_fops;
static const struct file_operations format2_fops;
static const struct file_operations format3_fops;
static const struct file_operations format4_fops;
static int table_open(struct inode *inode, struct file *file)
{
......@@ -573,6 +637,8 @@ static int table_open(struct inode *inode, struct file *file)
ret = seq_open(file, &format2_seq_ops);
else if (file->f_op == &format3_fops)
ret = seq_open(file, &format3_seq_ops);
else if (file->f_op == &format4_fops)
ret = seq_open(file, &format4_seq_ops);
if (ret)
return ret;
......@@ -606,6 +672,14 @@ static const struct file_operations format3_fops = {
.release = seq_release
};
static const struct file_operations format4_fops = {
.owner = THIS_MODULE,
.open = table_open,
.read = seq_read,
.llseek = seq_lseek,
.release = seq_release
};
/*
* dump lkb's on the ls_waiters list
*/
......@@ -652,6 +726,8 @@ void dlm_delete_debug_file(struct dlm_ls *ls)
debugfs_remove(ls->ls_debug_locks_dentry);
if (ls->ls_debug_all_dentry)
debugfs_remove(ls->ls_debug_all_dentry);
if (ls->ls_debug_toss_dentry)
debugfs_remove(ls->ls_debug_toss_dentry);
}
int dlm_create_debug_file(struct dlm_ls *ls)
......@@ -694,6 +770,19 @@ int dlm_create_debug_file(struct dlm_ls *ls)
if (!ls->ls_debug_all_dentry)
goto fail;
/* format 4 */
memset(name, 0, sizeof(name));
snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_toss", ls->ls_name);
ls->ls_debug_toss_dentry = debugfs_create_file(name,
S_IFREG | S_IRUGO,
dlm_root,
ls,
&format4_fops);
if (!ls->ls_debug_toss_dentry)
goto fail;
memset(name, 0, sizeof(name));
snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name);
......
......@@ -23,50 +23,6 @@
#include "lock.h"
#include "dir.h"
static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
{
spin_lock(&ls->ls_recover_list_lock);
list_add(&de->list, &ls->ls_recover_list);
spin_unlock(&ls->ls_recover_list_lock);
}
static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
{
int found = 0;
struct dlm_direntry *de;
spin_lock(&ls->ls_recover_list_lock);
list_for_each_entry(de, &ls->ls_recover_list, list) {
if (de->length == len) {
list_del(&de->list);
de->master_nodeid = 0;
memset(de->name, 0, len);
found = 1;
break;
}
}
spin_unlock(&ls->ls_recover_list_lock);
if (!found)
de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
return de;
}
void dlm_clear_free_entries(struct dlm_ls *ls)
{
struct dlm_direntry *de;
spin_lock(&ls->ls_recover_list_lock);
while (!list_empty(&ls->ls_recover_list)) {
de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
list);
list_del(&de->list);
kfree(de);
}
spin_unlock(&ls->ls_recover_list_lock);
}
/*
* We use the upper 16 bits of the hash value to select the directory node.
* Low bits are used for distribution of rsb's among hash buckets on each node.
......@@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls)
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
{
struct list_head *tmp;
struct dlm_member *memb = NULL;
uint32_t node, n = 0;
int nodeid;
if (ls->ls_num_nodes == 1) {
nodeid = dlm_our_nodeid();
goto out;
}
uint32_t node;
if (ls->ls_node_array) {
if (ls->ls_num_nodes == 1)
return dlm_our_nodeid();
else {
node = (hash >> 16) % ls->ls_total_weight;
nodeid = ls->ls_node_array[node];
goto out;
}
/* make_member_array() failed to kmalloc ls_node_array... */
node = (hash >> 16) % ls->ls_num_nodes;
list_for_each(tmp, &ls->ls_nodes) {
if (n++ != node)
continue;
memb = list_entry(tmp, struct dlm_member, list);
break;
return ls->ls_node_array[node];
}
DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
ls->ls_num_nodes, n, node););
nodeid = memb->nodeid;
out:
return nodeid;
}
int dlm_dir_nodeid(struct dlm_rsb *r)
{
return dlm_hash2nodeid(r->res_ls, r->res_hash);
}
static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
{
uint32_t val;
val = jhash(name, len, 0);
val &= (ls->ls_dirtbl_size - 1);
return val;
}
static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
{
uint32_t bucket;
bucket = dir_hash(ls, de->name, de->length);
list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
return r->res_dir_nodeid;
}
static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
int namelen, uint32_t bucket)
void dlm_recover_dir_nodeid(struct dlm_ls *ls)
{
struct dlm_direntry *de;
list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
if (de->length == namelen && !memcmp(name, de->name, namelen))
goto out;
}
de = NULL;
out:
return de;
}
void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
{
struct dlm_direntry *de;
uint32_t bucket;
bucket = dir_hash(ls, name, namelen);
spin_lock(&ls->ls_dirtbl[bucket].lock);
de = search_bucket(ls, name, namelen, bucket);
if (!de) {
log_error(ls, "remove fr %u none", nodeid);
goto out;
}
if (de->master_nodeid != nodeid) {
log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
goto out;
}
list_del(&de->list);
kfree(de);
out:
spin_unlock(&ls->ls_dirtbl[bucket].lock);
}
struct dlm_rsb *r;
void dlm_dir_clear(struct dlm_ls *ls)
{
struct list_head *head;
struct dlm_direntry *de;
int i;
DLM_ASSERT(list_empty(&ls->ls_recover_list), );
for (i = 0; i < ls->ls_dirtbl_size; i++) {
spin_lock(&ls->ls_dirtbl[i].lock);
head = &ls->ls_dirtbl[i].list;
while (!list_empty(head)) {
de = list_entry(head->next, struct dlm_direntry, list);
list_del(&de->list);
put_free_de(ls, de);
}
spin_unlock(&ls->ls_dirtbl[i].lock);
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
}
up_read(&ls->ls_root_sem);
}
int dlm_recover_directory(struct dlm_ls *ls)
{
struct dlm_member *memb;
struct dlm_direntry *de;
char *b, *last_name = NULL;
int error = -ENOMEM, last_len, count = 0;
int error = -ENOMEM, last_len, nodeid, result;
uint16_t namelen;
unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
log_debug(ls, "dlm_recover_directory");
if (dlm_no_directory(ls))
goto out_status;
dlm_dir_clear(ls);
last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
if (!last_name)
goto out;
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->nodeid == dlm_our_nodeid())
continue;
memset(last_name, 0, DLM_RESNAME_MAXLEN);
last_len = 0;
......@@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
if (error)
goto out_free;
schedule();
cond_resched();
/*
* pick namelen/name pairs out of received buffer
......@@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls)
if (namelen > DLM_RESNAME_MAXLEN)
goto out_free;
error = -ENOMEM;
de = get_free_de(ls, namelen);
if (!de)
error = dlm_master_lookup(ls, memb->nodeid,
b, namelen,
DLM_LU_RECOVER_DIR,
&nodeid, &result);
if (error) {
log_error(ls, "recover_dir lookup %d",
error);
goto out_free;
}
/* The name was found in rsbtbl, but the
* master nodeid is different from
* memb->nodeid which says it is the master.
* This should not happen. */
if (result == DLM_LU_MATCH &&
nodeid != memb->nodeid) {
count_bad++;
log_error(ls, "recover_dir lookup %d "
"nodeid %d memb %d bad %u",
result, nodeid, memb->nodeid,
count_bad);
print_hex_dump_bytes("dlm_recover_dir ",
DUMP_PREFIX_NONE,
b, namelen);
}
/* The name was found in rsbtbl, and the
* master nodeid matches memb->nodeid. */
if (result == DLM_LU_MATCH &&
nodeid == memb->nodeid) {
count_match++;
}
/* The name was not found in rsbtbl and was
* added with memb->nodeid as the master. */
if (result == DLM_LU_ADD) {
count_add++;
}
de->master_nodeid = memb->nodeid;
de->length = namelen;
last_len = namelen;
memcpy(de->name, b, namelen);
memcpy(last_name, b, namelen);
b += namelen;
left -= namelen;
add_entry_to_hash(ls, de);
count++;
}
}
done:
done:
;
}
out_status:
error = 0;
log_debug(ls, "dlm_recover_directory %d entries", count);
dlm_set_recover_status(ls, DLM_RS_DIR);
log_debug(ls, "dlm_recover_directory %u in %u new",
count, count_add);
out_free:
kfree(last_name);
out:
dlm_clear_free_entries(ls);
return error;
}
static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
int namelen, int *r_nodeid)
{
struct dlm_direntry *de, *tmp;
uint32_t bucket;
bucket = dir_hash(ls, name, namelen);
spin_lock(&ls->ls_dirtbl[bucket].lock);
de = search_bucket(ls, name, namelen, bucket);
if (de) {
*r_nodeid = de->master_nodeid;
spin_unlock(&ls->ls_dirtbl[bucket].lock);
if (*r_nodeid == nodeid)
return -EEXIST;
return 0;
}
spin_unlock(&ls->ls_dirtbl[bucket].lock);
if (namelen > DLM_RESNAME_MAXLEN)
return -EINVAL;
de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
if (!de)
return -ENOMEM;
de->master_nodeid = nodeid;
de->length = namelen;
memcpy(de->name, name, namelen);
spin_lock(&ls->ls_dirtbl[bucket].lock);
tmp = search_bucket(ls, name, namelen, bucket);
if (tmp) {
kfree(de);
de = tmp;
} else {
list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
}
*r_nodeid = de->master_nodeid;
spin_unlock(&ls->ls_dirtbl[bucket].lock);
return 0;
}
int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
int *r_nodeid)
{
return get_entry(ls, nodeid, name, namelen, r_nodeid);
}
static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
{
struct dlm_rsb *r;
......@@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
bucket = hash & (ls->ls_rsbtbl_size - 1);
spin_lock(&ls->ls_rsbtbl[bucket].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
if (rv)
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
name, len, 0, &r);
name, len, &r);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
if (!rv)
......@@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (len == r->res_length && !memcmp(name, r->res_name, len)) {
up_read(&ls->ls_root_sem);
log_error(ls, "find_rsb_root revert to root_list %s",
log_debug(ls, "find_rsb_root revert to root_list %s",
r->res_name);
return r;
}
......@@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
be_namelen = cpu_to_be16(0);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
ls->ls_recover_dir_sent_msg++;
goto out;
}
......@@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
offset += sizeof(__be16);
memcpy(outbuf + offset, r->res_name, r->res_length);
offset += r->res_length;
ls->ls_recover_dir_sent_res++;
}
/*
......@@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
be_namelen = cpu_to_be16(0xFFFF);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
ls->ls_recover_dir_sent_msg++;
}
out:
up_read(&ls->ls_root_sem);
}
......
......@@ -14,15 +14,10 @@
#ifndef __DIR_DOT_H__
#define __DIR_DOT_H__
int dlm_dir_nodeid(struct dlm_rsb *rsb);
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int len);
void dlm_dir_clear(struct dlm_ls *ls);
void dlm_clear_free_entries(struct dlm_ls *ls);
void dlm_recover_dir_nodeid(struct dlm_ls *ls);
int dlm_recover_directory(struct dlm_ls *ls);
int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
int *r_nodeid);
void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid);
......
......@@ -55,8 +55,6 @@ struct dlm_lkb;
struct dlm_rsb;
struct dlm_member;
struct dlm_rsbtable;
struct dlm_dirtable;
struct dlm_direntry;
struct dlm_recover;
struct dlm_header;
struct dlm_message;
......@@ -98,18 +96,6 @@ do { \
}
struct dlm_direntry {
struct list_head list;
uint32_t master_nodeid;
uint16_t length;
char name[1];
};
struct dlm_dirtable {
struct list_head list;
spinlock_t lock;
};
struct dlm_rsbtable {
struct rb_root keep;
struct rb_root toss;
......@@ -283,6 +269,15 @@ struct dlm_lkb {
};
};
/*
* res_master_nodeid is "normal": 0 is unset/invalid, non-zero is the real
* nodeid, even when nodeid is our_nodeid.
*
* res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid,
* greater than zero when another nodeid.
*
* (TODO: remove res_nodeid and only use res_master_nodeid)
*/
struct dlm_rsb {
struct dlm_ls *res_ls; /* the lockspace */
......@@ -291,6 +286,9 @@ struct dlm_rsb {
unsigned long res_flags;
int res_length; /* length of rsb name */
int res_nodeid;
int res_master_nodeid;
int res_dir_nodeid;
int res_id; /* for ls_recover_idr */
uint32_t res_lvbseq;
uint32_t res_hash;
uint32_t res_bucket; /* rsbtbl */
......@@ -313,10 +311,21 @@ struct dlm_rsb {
char res_name[DLM_RESNAME_MAXLEN+1];
};
/* dlm_master_lookup() flags */
#define DLM_LU_RECOVER_DIR 1
#define DLM_LU_RECOVER_MASTER 2
/* dlm_master_lookup() results */
#define DLM_LU_MATCH 1
#define DLM_LU_ADD 2
/* find_rsb() flags */
#define R_MASTER 1 /* only return rsb if it's a master */
#define R_CREATE 2 /* create/add rsb if not found */
#define R_REQUEST 0x00000001
#define R_RECEIVE_REQUEST 0x00000002
#define R_RECEIVE_RECOVER 0x00000004
/* rsb_flags */
......@@ -489,6 +498,13 @@ struct rcom_lock {
char rl_lvb[0];
};
/*
* The max number of resources per rsbtbl bucket that shrink will attempt
* to remove in each iteration.
*/
#define DLM_REMOVE_NAMES_MAX 8
struct dlm_ls {
struct list_head ls_list; /* list of lockspaces */
dlm_lockspace_t *ls_local_handle;
......@@ -509,9 +525,6 @@ struct dlm_ls {
struct dlm_rsbtable *ls_rsbtbl;
uint32_t ls_rsbtbl_size;
struct dlm_dirtable *ls_dirtbl;
uint32_t ls_dirtbl_size;
struct mutex ls_waiters_mutex;
struct list_head ls_waiters; /* lkbs needing a reply */
......@@ -525,6 +538,12 @@ struct dlm_ls {
int ls_new_rsb_count;
struct list_head ls_new_rsb; /* new rsb structs */
spinlock_t ls_remove_spin;
char ls_remove_name[DLM_RESNAME_MAXLEN+1];
char *ls_remove_names[DLM_REMOVE_NAMES_MAX];
int ls_remove_len;
int ls_remove_lens[DLM_REMOVE_NAMES_MAX];
struct list_head ls_nodes; /* current nodes in ls */
struct list_head ls_nodes_gone; /* dead node list, recovery */
int ls_num_nodes; /* number of nodes in ls */
......@@ -545,6 +564,7 @@ struct dlm_ls {
struct dentry *ls_debug_waiters_dentry; /* debugfs */
struct dentry *ls_debug_locks_dentry; /* debugfs */
struct dentry *ls_debug_all_dentry; /* debugfs */
struct dentry *ls_debug_toss_dentry; /* debugfs */
wait_queue_head_t ls_uevent_wait; /* user part of join/leave */
int ls_uevent_result;
......@@ -573,12 +593,16 @@ struct dlm_ls {
struct mutex ls_requestqueue_mutex;
struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */
unsigned int ls_recover_dir_sent_res; /* for log info */
unsigned int ls_recover_dir_sent_msg; /* for log info */
unsigned int ls_recover_locks_in; /* for log info */
uint64_t ls_rcom_seq;
spinlock_t ls_rcom_spin;
struct list_head ls_recover_list;
spinlock_t ls_recover_list_lock;
int ls_recover_list_count;
struct idr ls_recover_idr;
spinlock_t ls_recover_idr_lock;
wait_queue_head_t ls_wait_general;
struct mutex ls_clear_proc_locks;
......
......@@ -90,6 +90,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
static int receive_extralen(struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void del_timeout(struct dlm_lkb *lkb);
static void toss_rsb(struct kref *kref);
/*
* Lock compatibilty matrix - thanks Steve
......@@ -170,9 +171,11 @@ void dlm_print_lkb(struct dlm_lkb *lkb)
static void dlm_print_rsb(struct dlm_rsb *r)
{
printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
r->res_nodeid, r->res_flags, r->res_first_lkid,
r->res_recover_locks_count, r->res_name);
printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
"rlc %d name %s\n",
r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
r->res_name);
}
void dlm_dump_rsb(struct dlm_rsb *r)
......@@ -327,6 +330,37 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
* Basic operations on rsb's and lkb's
*/
/* This is only called to add a reference when the code already holds
a valid reference to the rsb, so there's no need for locking. */
static inline void hold_rsb(struct dlm_rsb *r)
{
kref_get(&r->res_ref);
}
void dlm_hold_rsb(struct dlm_rsb *r)
{
hold_rsb(r);
}
/* When all references to the rsb are gone it's transferred to
the tossed list for later disposal. */
static void put_rsb(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
uint32_t bucket = r->res_bucket;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
kref_put(&r->res_ref, toss_rsb);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
}
void dlm_put_rsb(struct dlm_rsb *r)
{
put_rsb(r);
}
static int pre_rsb_struct(struct dlm_ls *ls)
{
struct dlm_rsb *r1, *r2;
......@@ -411,11 +445,10 @@ static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
}
int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
unsigned int flags, struct dlm_rsb **r_ret)
struct dlm_rsb **r_ret)
{
struct rb_node *node = tree->rb_node;
struct dlm_rsb *r;
int error = 0;
int rc;
while (node) {
......@@ -432,10 +465,8 @@ int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
return -EBADR;
found:
if (r->res_nodeid && (flags & R_MASTER))
error = -ENOTBLK;
*r_ret = r;
return error;
return 0;
}
static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
......@@ -467,124 +498,587 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
return 0;
}
static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
unsigned int flags, struct dlm_rsb **r_ret)
/*
* Find rsb in rsbtbl and potentially create/add one
*
* Delaying the release of rsb's has a similar benefit to applications keeping
* NL locks on an rsb, but without the guarantee that the cached master value
* will still be valid when the rsb is reused. Apps aren't always smart enough
* to keep NL locks on an rsb that they may lock again shortly; this can lead
* to excessive master lookups and removals if we don't delay the release.
*
* Searching for an rsb means looking through both the normal list and toss
* list. When found on the toss list the rsb is moved to the normal list with
* ref count of 1; when found on normal list the ref count is incremented.
*
* rsb's on the keep list are being used locally and refcounted.
* rsb's on the toss list are not being used locally, and are not refcounted.
*
* The toss list rsb's were either
* - previously used locally but not any more (were on keep list, then
* moved to toss list when last refcount dropped)
* - created and put on toss list as a directory record for a lookup
* (we are the dir node for the res, but are not using the res right now,
* but some other node is)
*
* The purpose of find_rsb() is to return a refcounted rsb for local use.
* So, if the given rsb is on the toss list, it is moved to the keep list
* before being returned.
*
* toss_rsb() happens when all local usage of the rsb is done, i.e. no
* more refcounts exist, so the rsb is moved from the keep list to the
* toss list.
*
* rsb's on both keep and toss lists are used for doing a name to master
* lookups. rsb's that are in use locally (and being refcounted) are on
* the keep list, rsb's that are not in use locally (not refcounted) and
* only exist for name/master lookups are on the toss list.
*
* rsb's on the toss list who's dir_nodeid is not local can have stale
* name/master mappings. So, remote requests on such rsb's can potentially
* return with an error, which means the mapping is stale and needs to
* be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
* first_lkid is to keep only a single outstanding request on an rsb
* while that rsb has a potentially stale master.)
*/
static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
uint32_t hash, uint32_t b,
int dir_nodeid, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct dlm_rsb *r;
struct dlm_rsb *r = NULL;
int our_nodeid = dlm_our_nodeid();
int from_local = 0;
int from_other = 0;
int from_dir = 0;
int create = 0;
int error;
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
if (!error) {
kref_get(&r->res_ref);
goto out;
if (flags & R_RECEIVE_REQUEST) {
if (from_nodeid == dir_nodeid)
from_dir = 1;
else
from_other = 1;
} else if (flags & R_REQUEST) {
from_local = 1;
}
/*
* flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
* from_nodeid has sent us a lock in dlm_recover_locks, believing
* we're the new master. Our local recovery may not have set
* res_master_nodeid to our_nodeid yet, so allow either. Don't
* create the rsb; dlm_recover_process_copy() will handle EBADR
* by resending.
*
* If someone sends us a request, we are the dir node, and we do
* not find the rsb anywhere, then recreate it. This happens if
* someone sends us a request after we have removed/freed an rsb
* from our toss list. (They sent a request instead of lookup
* because they are using an rsb from their toss list.)
*/
if (from_local || from_dir ||
(from_other && (dir_nodeid == our_nodeid))) {
create = 1;
}
if (error == -ENOTBLK)
goto out;
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
retry:
if (create) {
error = pre_rsb_struct(ls);
if (error < 0)
goto out;
}
spin_lock(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (error)
goto out;
goto do_toss;
/*
* rsb is active, so we can't check master_nodeid without lock_rsb.
*/
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
kref_get(&r->res_ref);
error = 0;
goto out_unlock;
do_toss:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (error)
return error;
goto do_new;
if (dlm_no_directory(ls))
goto out;
/*
* rsb found inactive (master_nodeid may be out of date unless
* we are the dir_nodeid or were the master) No other thread
* is using this rsb because it's on the toss list, so we can
* look at or update res_master_nodeid without lock_rsb.
*/
if (r->res_nodeid == -1) {
if ((r->res_master_nodeid != our_nodeid) && from_other) {
/* our rsb was not master, and another node (not the dir node)
has sent us a request */
log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
from_nodeid, r->res_master_nodeid, dir_nodeid,
r->res_name);
error = -ENOTBLK;
goto out_unlock;
}
if ((r->res_master_nodeid != our_nodeid) && from_dir) {
/* don't think this should ever happen */
log_error(ls, "find_rsb toss from_dir %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
/* fix it and go on */
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
r->res_first_lkid = 0;
} else if (r->res_nodeid > 0) {
}
if (from_local && (r->res_master_nodeid != our_nodeid)) {
/* Because we have held no locks on this rsb,
res_master_nodeid could have become stale. */
rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
r->res_first_lkid = 0;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
goto out_unlock;
do_new:
/*
* rsb not found
*/
if (error == -EBADR && !create)
goto out_unlock;
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
goto out_unlock;
r->res_hash = hash;
r->res_bucket = b;
r->res_dir_nodeid = dir_nodeid;
kref_init(&r->res_ref);
if (from_dir) {
/* want to see how often this happens */
log_debug(ls, "find_rsb new from_dir %d recreate %s",
from_nodeid, r->res_name);
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
goto out_add;
}
if (from_other && (dir_nodeid != our_nodeid)) {
/* should never happen */
log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
from_nodeid, dir_nodeid, our_nodeid, r->res_name);
dlm_free_rsb(r);
error = -ENOTBLK;
goto out_unlock;
}
if (from_other) {
log_debug(ls, "find_rsb new from_other %d dir %d %s",
from_nodeid, dir_nodeid, r->res_name);
}
if (dir_nodeid == our_nodeid) {
/* When we are the dir nodeid, we can set the master
node immediately */
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
} else {
DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
/* set_master will send_lookup to dir_nodeid */
r->res_master_nodeid = 0;
r->res_nodeid = -1;
}
out_add:
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
out_unlock:
spin_unlock(&ls->ls_rsbtbl[b].lock);
out:
*r_ret = r;
return error;
}
/* During recovery, other nodes can send us new MSTCPY locks (from
dlm_recover_locks) before we've made ourself master (in
dlm_recover_masters). */
static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
uint32_t hash, uint32_t b,
int dir_nodeid, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct dlm_rsb *r = NULL;
int our_nodeid = dlm_our_nodeid();
int recover = (flags & R_RECEIVE_RECOVER);
int error;
retry:
error = pre_rsb_struct(ls);
if (error < 0)
goto out;
spin_lock(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (error)
goto do_toss;
/*
* rsb is active, so we can't check master_nodeid without lock_rsb.
*/
kref_get(&r->res_ref);
goto out_unlock;
do_toss:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (error)
goto do_new;
/*
* rsb found inactive. No other thread is using this rsb because
* it's on the toss list, so we can look at or update
* res_master_nodeid without lock_rsb.
*/
if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
/* our rsb is not master, and another node has sent us a
request; this should never happen */
log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
from_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
error = -ENOTBLK;
goto out_unlock;
}
if (!recover && (r->res_master_nodeid != our_nodeid) &&
(dir_nodeid == our_nodeid)) {
/* our rsb is not master, and we are dir; may as well fix it;
this should never happen */
log_error(ls, "find_rsb toss our %d master %d dir %d",
our_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
goto out_unlock;
do_new:
/*
* rsb not found
*/
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
goto out_unlock;
r->res_hash = hash;
r->res_bucket = b;
r->res_dir_nodeid = dir_nodeid;
r->res_master_nodeid = dir_nodeid;
r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
kref_init(&r->res_ref);
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
out_unlock:
spin_unlock(&ls->ls_rsbtbl[b].lock);
out:
*r_ret = r;
return error;
}
static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
{
uint32_t hash, b;
int dir_nodeid;
if (len > DLM_RESNAME_MAXLEN)
return -EINVAL;
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
dir_nodeid = dlm_hash2nodeid(ls, hash);
if (dlm_no_directory(ls))
return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
from_nodeid, flags, r_ret);
else
return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
from_nodeid, flags, r_ret);
}
/* we have received a request and found that res_master_nodeid != our_nodeid,
so we need to return an error or make ourself the master */
static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
int from_nodeid)
{
if (dlm_no_directory(ls)) {
log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
from_nodeid, r->res_master_nodeid,
r->res_dir_nodeid);
dlm_print_rsb(r);
return -ENOTBLK;
}
if (from_nodeid != r->res_dir_nodeid) {
/* our rsb is not master, and another node (not the dir node)
has sent us a request. this is much more common when our
master_nodeid is zero, so limit debug to non-zero. */
if (r->res_master_nodeid) {
log_debug(ls, "validate master from_other %d master %d "
"dir %d first %x %s", from_nodeid,
r->res_master_nodeid, r->res_dir_nodeid,
r->res_first_lkid, r->res_name);
}
return -ENOTBLK;
} else {
/* our rsb is not master, but the dir nodeid has sent us a
request; this could happen with master 0 / res_nodeid -1 */
if (r->res_master_nodeid) {
log_error(ls, "validate master from_dir %d master %d "
"first %x %s",
from_nodeid, r->res_master_nodeid,
r->res_first_lkid, r->res_name);
}
r->res_master_nodeid = dlm_our_nodeid();
r->res_nodeid = 0;
return 0;
}
}
/*
* Find rsb in rsbtbl and potentially create/add one
* We're the dir node for this res and another node wants to know the
* master nodeid. During normal operation (non recovery) this is only
* called from receive_lookup(); master lookups when the local node is
* the dir node are done by find_rsb().
*
* Delaying the release of rsb's has a similar benefit to applications keeping
* NL locks on an rsb, but without the guarantee that the cached master value
* will still be valid when the rsb is reused. Apps aren't always smart enough
* to keep NL locks on an rsb that they may lock again shortly; this can lead
* to excessive master lookups and removals if we don't delay the release.
* normal operation, we are the dir node for a resource
* . _request_lock
* . set_master
* . send_lookup
* . receive_lookup
* . dlm_master_lookup flags 0
*
* Searching for an rsb means looking through both the normal list and toss
* list. When found on the toss list the rsb is moved to the normal list with
* ref count of 1; when found on normal list the ref count is incremented.
* recover directory, we are rebuilding dir for all resources
* . dlm_recover_directory
* . dlm_rcom_names
* remote node sends back the rsb names it is master of and we are dir of
* . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
* we either create new rsb setting remote node as master, or find existing
* rsb and set master to be the remote node.
*
* recover masters, we are finding the new master for resources
* . dlm_recover_masters
* . recover_master
* . dlm_send_rcom_lookup
* . receive_rcom_lookup
* . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
*/
static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
unsigned int flags, struct dlm_rsb **r_ret)
int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
unsigned int flags, int *r_nodeid, int *result)
{
struct dlm_rsb *r = NULL;
uint32_t hash, bucket;
int error;
uint32_t hash, b;
int from_master = (flags & DLM_LU_RECOVER_DIR);
int fix_master = (flags & DLM_LU_RECOVER_MASTER);
int our_nodeid = dlm_our_nodeid();
int dir_nodeid, error, toss_list = 0;
if (namelen > DLM_RESNAME_MAXLEN) {
error = -EINVAL;
goto out;
if (len > DLM_RESNAME_MAXLEN)
return -EINVAL;
if (from_nodeid == our_nodeid) {
log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
our_nodeid, flags);
return -EINVAL;
}
if (dlm_no_directory(ls))
flags |= R_CREATE;
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
hash = jhash(name, namelen, 0);
bucket = hash & (ls->ls_rsbtbl_size - 1);
dir_nodeid = dlm_hash2nodeid(ls, hash);
if (dir_nodeid != our_nodeid) {
log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
from_nodeid, dir_nodeid, our_nodeid, hash,
ls->ls_num_nodes);
*r_nodeid = -1;
return -EINVAL;
}
retry:
if (flags & R_CREATE) {
error = pre_rsb_struct(ls);
if (error < 0)
goto out;
error = pre_rsb_struct(ls);
if (error < 0)
return error;
spin_lock(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!error) {
/* because the rsb is active, we need to lock_rsb before
checking/changing re_master_nodeid */
hold_rsb(r);
spin_unlock(&ls->ls_rsbtbl[b].lock);
lock_rsb(r);
goto found;
}
spin_lock(&ls->ls_rsbtbl[bucket].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (error)
goto not_found;
error = _search_rsb(ls, name, namelen, bucket, flags, &r);
if (!error)
goto out_unlock;
/* because the rsb is inactive (on toss list), it's not refcounted
and lock_rsb is not used, but is protected by the rsbtbl lock */
if (error == -EBADR && !(flags & R_CREATE))
goto out_unlock;
toss_list = 1;
found:
if (r->res_dir_nodeid != our_nodeid) {
/* should not happen, but may as well fix it and carry on */
log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
r->res_dir_nodeid, our_nodeid, r->res_name);
r->res_dir_nodeid = our_nodeid;
}
if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
/* Recovery uses this function to set a new master when
the previous master failed. Setting NEW_MASTER will
force dlm_recover_masters to call recover_master on this
rsb even though the res_nodeid is no longer removed. */
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
rsb_set_flag(r, RSB_NEW_MASTER);
if (toss_list) {
/* I don't think we should ever find it on toss list. */
log_error(ls, "dlm_master_lookup fix_master on toss");
dlm_dump_rsb(r);
}
}
/* the rsb was found but wasn't a master copy */
if (error == -ENOTBLK)
goto out_unlock;
if (from_master && (r->res_master_nodeid != from_nodeid)) {
/* this will happen if from_nodeid became master during
a previous recovery cycle, and we aborted the previous
cycle before recovering this master value */
log_limit(ls, "dlm_master_lookup from_master %d "
"master_nodeid %d res_nodeid %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_nodeid,
r->res_first_lkid, r->res_name);
if (r->res_master_nodeid == our_nodeid) {
log_error(ls, "from_master %d our_master", from_nodeid);
dlm_dump_rsb(r);
dlm_send_rcom_lookup_dump(r, from_nodeid);
goto out_found;
}
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
rsb_set_flag(r, RSB_NEW_MASTER);
}
if (!r->res_master_nodeid) {
/* this will happen if recovery happens while we're looking
up the master for this rsb */
log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
from_nodeid, r->res_first_lkid, r->res_name);
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
}
if (!from_master && !fix_master &&
(r->res_master_nodeid == from_nodeid)) {
/* this can happen when the master sends remove, the dir node
finds the rsb on the keep list and ignores the remove,
and the former master sends a lookup */
log_limit(ls, "dlm_master_lookup from master %d flags %x "
"first %x %s", from_nodeid, flags,
r->res_first_lkid, r->res_name);
}
out_found:
*r_nodeid = r->res_master_nodeid;
if (result)
*result = DLM_LU_MATCH;
if (toss_list) {
r->res_toss_time = jiffies;
/* the rsb was inactive (on toss list) */
spin_unlock(&ls->ls_rsbtbl[b].lock);
} else {
/* the rsb was active */
unlock_rsb(r);
put_rsb(r);
}
return 0;
error = get_rsb_struct(ls, name, namelen, &r);
not_found:
error = get_rsb_struct(ls, name, len, &r);
if (error == -EAGAIN) {
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock(&ls->ls_rsbtbl[b].lock);
goto retry;
}
if (error)
goto out_unlock;
r->res_hash = hash;
r->res_bucket = bucket;
r->res_nodeid = -1;
r->res_bucket = b;
r->res_dir_nodeid = our_nodeid;
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
kref_init(&r->res_ref);
r->res_toss_time = jiffies;
/* With no directory, the master can be set immediately */
if (dlm_no_directory(ls)) {
int nodeid = dlm_dir_nodeid(r);
if (nodeid == dlm_our_nodeid())
nodeid = 0;
r->res_nodeid = nodeid;
error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
if (error) {
/* should never happen */
dlm_free_rsb(r);
spin_unlock(&ls->ls_rsbtbl[b].lock);
goto retry;
}
error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
if (result)
*result = DLM_LU_ADD;
*r_nodeid = from_nodeid;
error = 0;
out_unlock:
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
out:
*r_ret = r;
spin_unlock(&ls->ls_rsbtbl[b].lock);
return error;
}
......@@ -605,17 +1099,27 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
}
}
/* This is only called to add a reference when the code already holds
a valid reference to the rsb, so there's no need for locking. */
static inline void hold_rsb(struct dlm_rsb *r)
void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
{
kref_get(&r->res_ref);
}
struct dlm_rsb *r = NULL;
uint32_t hash, b;
int error;
void dlm_hold_rsb(struct dlm_rsb *r)
{
hold_rsb(r);
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
spin_lock(&ls->ls_rsbtbl[b].lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!error)
goto out_dump;
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (error)
goto out;
out_dump:
dlm_dump_rsb(r);
out:
spin_unlock(&ls->ls_rsbtbl[b].lock);
}
static void toss_rsb(struct kref *kref)
......@@ -634,24 +1138,6 @@ static void toss_rsb(struct kref *kref)
}
}
/* When all references to the rsb are gone it's transferred to
the tossed list for later disposal. */
static void put_rsb(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
uint32_t bucket = r->res_bucket;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
kref_put(&r->res_ref, toss_rsb);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
}
void dlm_put_rsb(struct dlm_rsb *r)
{
put_rsb(r);
}
/* See comment for unhold_lkb */
static void unhold_rsb(struct dlm_rsb *r)
......@@ -1138,61 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
return error;
}
static void dir_remove(struct dlm_rsb *r)
{
int to_nodeid;
if (dlm_no_directory(r->res_ls))
return;
/* If there's an rsb for the same resource being removed, ensure
that the remove message is sent before the new lookup message.
It should be rare to need a delay here, but if not, then it may
be worthwhile to add a proper wait mechanism rather than a delay. */
to_nodeid = dlm_dir_nodeid(r);
if (to_nodeid != dlm_our_nodeid())
send_remove(r);
else
dlm_dir_remove_entry(r->res_ls, to_nodeid,
r->res_name, r->res_length);
static void wait_pending_remove(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
restart:
spin_lock(&ls->ls_remove_spin);
if (ls->ls_remove_len &&
!rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
log_debug(ls, "delay lookup for remove dir %d %s",
r->res_dir_nodeid, r->res_name);
spin_unlock(&ls->ls_remove_spin);
msleep(1);
goto restart;
}
spin_unlock(&ls->ls_remove_spin);
}
/* FIXME: make this more efficient */
/*
* ls_remove_spin protects ls_remove_name and ls_remove_len which are
* read by other threads in wait_pending_remove. ls_remove_names
* and ls_remove_lens are only used by the scan thread, so they do
* not need protection.
*/
static int shrink_bucket(struct dlm_ls *ls, int b)
static void shrink_bucket(struct dlm_ls *ls, int b)
{
struct rb_node *n;
struct rb_node *n, *next;
struct dlm_rsb *r;
int count = 0, found;
char *name;
int our_nodeid = dlm_our_nodeid();
int remote_count = 0;
int i, len, rv;
memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
spin_lock(&ls->ls_rsbtbl[b].lock);
for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
/* If we're the directory record for this rsb, and
we're not the master of it, then we need to wait
for the master node to send us a dir remove for
before removing the dir record. */
if (!dlm_no_directory(ls) &&
(r->res_master_nodeid != our_nodeid) &&
(dlm_dir_nodeid(r) == our_nodeid)) {
continue;
}
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ)) {
continue;
}
if (!dlm_no_directory(ls) &&
(r->res_master_nodeid == our_nodeid) &&
(dlm_dir_nodeid(r) != our_nodeid)) {
/* We're the master of this rsb but we're not
the directory record, so we need to tell the
dir node to remove the dir record. */
ls->ls_remove_lens[remote_count] = r->res_length;
memcpy(ls->ls_remove_names[remote_count], r->res_name,
DLM_RESNAME_MAXLEN);
remote_count++;
if (remote_count >= DLM_REMOVE_NAMES_MAX)
break;
continue;
}
if (!kref_put(&r->res_ref, kill_rsb)) {
log_error(ls, "tossed rsb in use %s", r->res_name);
continue;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
dlm_free_rsb(r);
}
spin_unlock(&ls->ls_rsbtbl[b].lock);
/*
* While searching for rsb's to free, we found some that require
* remote removal. We leave them in place and find them again here
* so there is a very small gap between removing them from the toss
* list and sending the removal. Keeping this gap small is
* important to keep us (the master node) from being out of sync
* with the remote dir node for very long.
*
* From the time the rsb is removed from toss until just after
* send_remove, the rsb name is saved in ls_remove_name. A new
* lookup checks this to ensure that a new lookup message for the
* same resource name is not sent just before the remove message.
*/
for (i = 0; i < remote_count; i++) {
name = ls->ls_remove_names[i];
len = ls->ls_remove_lens[i];
for (;;) {
found = 0;
spin_lock(&ls->ls_rsbtbl[b].lock);
for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ))
continue;
found = 1;
break;
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name not toss %s", name);
continue;
}
if (!found) {
if (r->res_master_nodeid != our_nodeid) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
break;
log_debug(ls, "remove_name master %d dir %d our %d %s",
r->res_master_nodeid, r->res_dir_nodeid,
our_nodeid, name);
continue;
}
if (kref_put(&r->res_ref, kill_rsb)) {
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
if (r->res_dir_nodeid == our_nodeid) {
/* should never happen */
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "remove_name dir %d master %d our %d %s",
r->res_dir_nodeid, r->res_master_nodeid,
our_nodeid, name);
continue;
}
if (is_master(r))
dir_remove(r);
dlm_free_rsb(r);
count++;
} else {
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ)) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "tossed rsb in use %s", r->res_name);
log_debug(ls, "remove_name toss_time %lu now %lu %s",
r->res_toss_time, jiffies, name);
continue;
}
}
return count;
if (!kref_put(&r->res_ref, kill_rsb)) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "remove_name in use %s", name);
continue;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
/* block lookup of same name until we've sent remove */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = len;
memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
spin_unlock(&ls->ls_rsbtbl[b].lock);
send_remove(r);
/* allow lookup of name again */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = 0;
memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
dlm_free_rsb(r);
}
}
void dlm_scan_rsbs(struct dlm_ls *ls)
......@@ -1684,10 +2279,14 @@ static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
* immediate request, it is 0 if called later, after the lock has been
* queued.
*
* recover is 1 if dlm_recover_grant() is trying to grant conversions
* after recovery.
*
* References are from chapter 6 of "VAXcluster Principles" by Roy Davis
*/
static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
int recover)
{
int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
......@@ -1719,7 +2318,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
*/
if (queue_conflict(&r->res_grantqueue, lkb))
goto out;
return 0;
/*
* 6-3: By default, a conversion request is immediately granted if the
......@@ -1728,7 +2327,24 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
*/
if (queue_conflict(&r->res_convertqueue, lkb))
goto out;
return 0;
/*
* The RECOVER_GRANT flag means dlm_recover_grant() is granting
* locks for a recovered rsb, on which lkb's have been rebuilt.
* The lkb's may have been rebuilt on the queues in a different
* order than they were in on the previous master. So, granting
* queued conversions in order after recovery doesn't make sense
* since the order hasn't been preserved anyway. The new order
* could also have created a new "in place" conversion deadlock.
* (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
* After recovery, there would be no granted locks, and possibly
* NL->EX, PR->EX, an in-place conversion deadlock.) So, after
* recovery, grant conversions without considering order.
*/
if (conv && recover)
return 1;
/*
* 6-5: But the default algorithm for deciding whether to grant or
......@@ -1765,7 +2381,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
if (list_empty(&r->res_convertqueue))
return 1;
else
goto out;
return 0;
}
/*
......@@ -1811,12 +2427,12 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
if (!now && !conv && list_empty(&r->res_convertqueue) &&
first_in_list(lkb, &r->res_waitqueue))
return 1;
out:
return 0;
}
static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
int *err)
int recover, int *err)
{
int rv;
int8_t alt = 0, rqmode = lkb->lkb_rqmode;
......@@ -1825,7 +2441,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
if (err)
*err = 0;
rv = _can_be_granted(r, lkb, now);
rv = _can_be_granted(r, lkb, now, recover);
if (rv)
goto out;
......@@ -1866,7 +2482,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
if (alt) {
lkb->lkb_rqmode = alt;
rv = _can_be_granted(r, lkb, now);
rv = _can_be_granted(r, lkb, now, 0);
if (rv)
lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
else
......@@ -1890,6 +2506,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
unsigned int *count)
{
struct dlm_lkb *lkb, *s;
int recover = rsb_flag(r, RSB_RECOVER_GRANT);
int hi, demoted, quit, grant_restart, demote_restart;
int deadlk;
......@@ -1903,7 +2520,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
demoted = is_demoted(lkb);
deadlk = 0;
if (can_be_granted(r, lkb, 0, &deadlk)) {
if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
grant_lock_pending(r, lkb);
grant_restart = 1;
if (count)
......@@ -1947,7 +2564,7 @@ static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
struct dlm_lkb *lkb, *s;
list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
if (can_be_granted(r, lkb, 0, NULL)) {
if (can_be_granted(r, lkb, 0, 0, NULL)) {
grant_lock_pending(r, lkb);
if (count)
(*count)++;
......@@ -2078,8 +2695,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
struct dlm_ls *ls = r->res_ls;
int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
int our_nodeid = dlm_our_nodeid();
if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
......@@ -2093,53 +2709,37 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
return 1;
}
if (r->res_nodeid == 0) {
if (r->res_master_nodeid == our_nodeid) {
lkb->lkb_nodeid = 0;
return 0;
}
if (r->res_nodeid > 0) {
lkb->lkb_nodeid = r->res_nodeid;
if (r->res_master_nodeid) {
lkb->lkb_nodeid = r->res_master_nodeid;
return 0;
}
DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid != our_nodeid) {
r->res_first_lkid = lkb->lkb_id;
send_lookup(r, lkb);
return 1;
}
for (i = 0; i < 2; i++) {
/* It's possible for dlm_scand to remove an old rsb for
this same resource from the toss list, us to create
a new one, look up the master locally, and find it
already exists just before dlm_scand does the
dir_remove() on the previous rsb. */
error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
r->res_length, &ret_nodeid);
if (!error)
break;
log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
schedule();
}
if (error && error != -EEXIST)
return error;
if (ret_nodeid == our_nodeid) {
r->res_first_lkid = 0;
if (dlm_dir_nodeid(r) == our_nodeid) {
/* This is a somewhat unusual case; find_rsb will usually
have set res_master_nodeid when dir nodeid is local, but
there are cases where we become the dir node after we've
past find_rsb and go through _request_lock again.
confirm_master() or process_lookup_list() needs to be
called after this. */
log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
r->res_name);
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
lkb->lkb_nodeid = 0;
} else {
r->res_first_lkid = lkb->lkb_id;
r->res_nodeid = ret_nodeid;
lkb->lkb_nodeid = ret_nodeid;
return 0;
}
return 0;
wait_pending_remove(r);
r->res_first_lkid = lkb->lkb_id;
send_lookup(r, lkb);
return 1;
}
static void process_lookup_list(struct dlm_rsb *r)
......@@ -2464,7 +3064,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
int error = 0;
if (can_be_granted(r, lkb, 1, NULL)) {
if (can_be_granted(r, lkb, 1, 0, NULL)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
goto out;
......@@ -2504,7 +3104,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* changing an existing lock may allow others to be granted */
if (can_be_granted(r, lkb, 1, &deadlk)) {
if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
goto out;
......@@ -2530,7 +3130,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (is_demoted(lkb)) {
grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
if (_can_be_granted(r, lkb, 1)) {
if (_can_be_granted(r, lkb, 1, 0)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
goto out;
......@@ -2584,7 +3184,7 @@ static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
}
/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
int error;
......@@ -2708,11 +3308,11 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
error = validate_lock_args(ls, lkb, args);
if (error)
goto out;
return error;
error = find_rsb(ls, name, len, R_CREATE, &r);
error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
if (error)
goto out;
return error;
lock_rsb(r);
......@@ -2723,8 +3323,6 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
unlock_rsb(r);
put_rsb(r);
out:
return error;
}
......@@ -3402,11 +4000,72 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
return error;
}
static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
{
char name[DLM_RESNAME_MAXLEN + 1];
struct dlm_message *ms;
struct dlm_mhandle *mh;
struct dlm_rsb *r;
uint32_t hash, b;
int rv, dir_nodeid;
memset(name, 0, sizeof(name));
memcpy(name, ms_name, len);
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
dir_nodeid = dlm_hash2nodeid(ls, hash);
log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
spin_lock(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!rv) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "repeat_remove on keep %s", name);
return;
}
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (!rv) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "repeat_remove on toss %s", name);
return;
}
/* use ls->remove_name2 to avoid conflict with shrink? */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = len;
memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
spin_unlock(&ls->ls_rsbtbl[b].lock);
rv = _create_message(ls, sizeof(struct dlm_message) + len,
dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
if (rv)
return;
memcpy(ms->m_extra, name, len);
ms->m_hash = hash;
send_message(mh, ms);
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = 0;
memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
}
static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error, namelen;
int from_nodeid;
int error, namelen = 0;
from_nodeid = ms->m_header.h_nodeid;
error = create_lkb(ls, &lkb);
if (error)
......@@ -3420,9 +4079,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
goto fail;
}
/* The dir node is the authority on whether we are the master
for this rsb or not, so if the master sends us a request, we should
recreate the rsb if we've destroyed it. This race happens when we
send a remove message to the dir node at the same time that the dir
node sends us a request for the rsb. */
namelen = receive_extralen(ms);
error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
R_RECEIVE_REQUEST, &r);
if (error) {
__put_lkb(ls, lkb);
goto fail;
......@@ -3430,6 +4096,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
lock_rsb(r);
if (r->res_master_nodeid != dlm_our_nodeid()) {
error = validate_master_nodeid(ls, r, from_nodeid);
if (error) {
unlock_rsb(r);
put_rsb(r);
__put_lkb(ls, lkb);
goto fail;
}
}
attach_lkb(r, lkb);
error = do_request(r, lkb);
send_request_reply(r, lkb, error);
......@@ -3445,6 +4121,31 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
return 0;
fail:
/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
and do this receive_request again from process_lookup_list once
we get the lookup reply. This would avoid a many repeated
ENOTBLK request failures when the lookup reply designating us
as master is delayed. */
/* We could repeatedly return -EBADR here if our send_remove() is
delayed in being sent/arriving/being processed on the dir node.
Another node would repeatedly lookup up the master, and the dir
node would continue returning our nodeid until our send_remove
took effect.
We send another remove message in case our previous send_remove
was lost/ignored/missed somehow. */
if (error != -ENOTBLK) {
log_limit(ls, "receive_request %x from %d %d",
ms->m_lkid, from_nodeid, error);
}
if (namelen && error == -EBADR) {
send_repeat_remove(ls, ms->m_extra, namelen);
msleep(1000);
}
setup_stub_lkb(ls, ms);
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
return error;
......@@ -3651,49 +4352,110 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
{
int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
int len, error, ret_nodeid, from_nodeid, our_nodeid;
from_nodeid = ms->m_header.h_nodeid;
our_nodeid = dlm_our_nodeid();
len = receive_extralen(ms);
dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
if (dir_nodeid != our_nodeid) {
log_error(ls, "lookup dir_nodeid %d from %d",
dir_nodeid, from_nodeid);
error = -EINVAL;
ret_nodeid = -1;
goto out;
}
error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
&ret_nodeid, NULL);
/* Optimization: we're master so treat lookup as a request */
if (!error && ret_nodeid == our_nodeid) {
receive_request(ls, ms);
return;
}
out:
send_lookup_reply(ls, ms, ret_nodeid, error);
}
static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
{
int len, dir_nodeid, from_nodeid;
char name[DLM_RESNAME_MAXLEN+1];
struct dlm_rsb *r;
uint32_t hash, b;
int rv, len, dir_nodeid, from_nodeid;
from_nodeid = ms->m_header.h_nodeid;
len = receive_extralen(ms);
if (len > DLM_RESNAME_MAXLEN) {
log_error(ls, "receive_remove from %d bad len %d",
from_nodeid, len);
return;
}
dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
if (dir_nodeid != dlm_our_nodeid()) {
log_error(ls, "remove dir entry dir_nodeid %d from %d",
dir_nodeid, from_nodeid);
log_error(ls, "receive_remove from %d bad nodeid %d",
from_nodeid, dir_nodeid);
return;
}
/* Look for name on rsbtbl.toss, if it's there, kill it.
If it's on rsbtbl.keep, it's being used, and we should ignore this
message. This is an expected race between the dir node sending a
request to the master node at the same time as the master node sends
a remove to the dir node. The resolution to that race is for the
dir node to ignore the remove message, and the master node to
recreate the master rsb when it gets a request from the dir node for
an rsb it doesn't have. */
memset(name, 0, sizeof(name));
memcpy(name, ms->m_extra, len);
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
spin_lock(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) {
/* verify the rsb is on keep list per comment above */
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (rv) {
/* should not happen */
log_error(ls, "receive_remove from %d not found %s",
from_nodeid, name);
spin_unlock(&ls->ls_rsbtbl[b].lock);
return;
}
if (r->res_master_nodeid != from_nodeid) {
/* should not happen */
log_error(ls, "receive_remove keep from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
spin_unlock(&ls->ls_rsbtbl[b].lock);
return;
}
log_debug(ls, "receive_remove from %d master %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_first_lkid,
name);
spin_unlock(&ls->ls_rsbtbl[b].lock);
return;
}
if (r->res_master_nodeid != from_nodeid) {
log_error(ls, "receive_remove toss from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
spin_unlock(&ls->ls_rsbtbl[b].lock);
return;
}
dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
if (kref_put(&r->res_ref, kill_rsb)) {
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
spin_unlock(&ls->ls_rsbtbl[b].lock);
dlm_free_rsb(r);
} else {
log_error(ls, "receive_remove from %d rsb ref error",
from_nodeid);
dlm_print_rsb(r);
spin_unlock(&ls->ls_rsbtbl[b].lock);
}
}
static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
......@@ -3706,6 +4468,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error, mstype, result;
int from_nodeid = ms->m_header.h_nodeid;
error = find_lkb(ls, ms->m_remid, &lkb);
if (error)
......@@ -3723,8 +4486,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
if (error) {
log_error(ls, "receive_request_reply %x remote %d %x result %d",
lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
ms->m_result);
lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
dlm_dump_rsb(r);
goto out;
}
......@@ -3732,8 +4494,9 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
/* Optimization: the dir node was also the master, so it took our
lookup as a request and sent request reply instead of lookup reply */
if (mstype == DLM_MSG_LOOKUP) {
r->res_nodeid = ms->m_header.h_nodeid;
lkb->lkb_nodeid = r->res_nodeid;
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
lkb->lkb_nodeid = from_nodeid;
}
/* this is the value returned from do_request() on the master */
......@@ -3767,18 +4530,30 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
case -EBADR:
case -ENOTBLK:
/* find_rsb failed to find rsb or rsb wasn't master */
log_debug(ls, "receive_request_reply %x %x master diff %d %d",
lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
r->res_nodeid = -1;
lkb->lkb_nodeid = -1;
log_limit(ls, "receive_request_reply %x from %d %d "
"master %d dir %d first %x %s", lkb->lkb_id,
from_nodeid, result, r->res_master_nodeid,
r->res_dir_nodeid, r->res_first_lkid, r->res_name);
if (r->res_dir_nodeid != dlm_our_nodeid() &&
r->res_master_nodeid != dlm_our_nodeid()) {
/* cause _request_lock->set_master->send_lookup */
r->res_master_nodeid = 0;
r->res_nodeid = -1;
lkb->lkb_nodeid = -1;
}
if (is_overlap(lkb)) {
/* we'll ignore error in cancel/unlock reply */
queue_cast_overlap(r, lkb);
confirm_master(r, result);
unhold_lkb(lkb); /* undoes create_lkb() */
} else
} else {
_request_lock(r, lkb);
if (r->res_master_nodeid == dlm_our_nodeid())
confirm_master(r, 0);
}
break;
default:
......@@ -3994,6 +4769,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error, ret_nodeid;
int do_lookup_list = 0;
error = find_lkb(ls, ms->m_lkid, &lkb);
if (error) {
......@@ -4001,7 +4777,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
return;
}
/* ms->m_result is the value returned by dlm_dir_lookup on dir node
/* ms->m_result is the value returned by dlm_master_lookup on dir node
FIXME: will a non-zero error ever be returned? */
r = lkb->lkb_resource;
......@@ -4013,12 +4789,37 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
goto out;
ret_nodeid = ms->m_nodeid;
/* We sometimes receive a request from the dir node for this
rsb before we've received the dir node's loookup_reply for it.
The request from the dir node implies we're the master, so we set
ourself as master in receive_request_reply, and verify here that
we are indeed the master. */
if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
/* This should never happen */
log_error(ls, "receive_lookup_reply %x from %d ret %d "
"master %d dir %d our %d first %x %s",
lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
r->res_master_nodeid, r->res_dir_nodeid,
dlm_our_nodeid(), r->res_first_lkid, r->res_name);
}
if (ret_nodeid == dlm_our_nodeid()) {
r->res_master_nodeid = ret_nodeid;
r->res_nodeid = 0;
ret_nodeid = 0;
do_lookup_list = 1;
r->res_first_lkid = 0;
} else if (ret_nodeid == -1) {
/* the remote node doesn't believe it's the dir node */
log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
lkb->lkb_id, ms->m_header.h_nodeid);
r->res_master_nodeid = 0;
r->res_nodeid = -1;
lkb->lkb_nodeid = -1;
} else {
/* set_master() will copy res_nodeid to lkb_nodeid */
/* set_master() will set lkb_nodeid from r */
r->res_master_nodeid = ret_nodeid;
r->res_nodeid = ret_nodeid;
}
......@@ -4033,7 +4834,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
_request_lock(r, lkb);
out_list:
if (!ret_nodeid)
if (do_lookup_list)
process_lookup_list(r);
out:
unlock_rsb(r);
......@@ -4047,7 +4848,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
int error = 0, noent = 0;
if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
log_debug(ls, "ignore non-member message %d from %d %x %x %d",
log_limit(ls, "receive %d from non-member %d %x %x %d",
ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
ms->m_remid, ms->m_result);
return;
......@@ -4174,6 +4975,15 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
int nodeid)
{
if (dlm_locking_stopped(ls)) {
/* If we were a member of this lockspace, left, and rejoined,
other nodes may still be sending us messages from the
lockspace generation before we left. */
if (!ls->ls_generation) {
log_limit(ls, "receive %d from %d ignore old gen",
ms->m_type, nodeid);
return;
}
dlm_add_requestqueue(ls, nodeid, ms);
} else {
dlm_wait_requestqueue(ls);
......@@ -4651,9 +5461,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
if (!rsb_flag(r, RSB_RECOVER_GRANT))
continue;
rsb_clear_flag(r, RSB_RECOVER_GRANT);
if (!is_master(r))
if (!is_master(r)) {
rsb_clear_flag(r, RSB_RECOVER_GRANT);
continue;
}
hold_rsb(r);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
return r;
......@@ -4698,7 +5509,9 @@ void dlm_recover_grant(struct dlm_ls *ls)
rsb_count++;
count = 0;
lock_rsb(r);
/* the RECOVER_GRANT flag is checked in the grant path */
grant_pending_locks(r, &count);
rsb_clear_flag(r, RSB_RECOVER_GRANT);
lkb_count += count;
confirm_master(r, 0);
unlock_rsb(r);
......@@ -4798,6 +5611,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
struct dlm_rsb *r;
struct dlm_lkb *lkb;
uint32_t remid = 0;
int from_nodeid = rc->rc_header.h_nodeid;
int error;
if (rl->rl_parent_lkid) {
......@@ -4815,21 +5629,21 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
we make ourselves master, dlm_recover_masters() won't touch the
MSTCPY locks we've received early. */
error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
from_nodeid, R_RECEIVE_RECOVER, &r);
if (error)
goto out;
lock_rsb(r);
if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
rc->rc_header.h_nodeid, remid);
from_nodeid, remid);
error = -EBADR;
put_rsb(r);
goto out;
goto out_unlock;
}
lock_rsb(r);
lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
lkb = search_remid(r, from_nodeid, remid);
if (lkb) {
error = -EEXIST;
goto out_remid;
......@@ -4866,7 +5680,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
out:
if (error && error != -EEXIST)
log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
rc->rc_header.h_nodeid, remid, error);
from_nodeid, remid, error);
rl->rl_result = cpu_to_le32(error);
return error;
}
......
......@@ -14,6 +14,7 @@
#define __LOCK_DOT_H__
void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len);
void dlm_print_lkb(struct dlm_lkb *lkb);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
uint32_t saved_seq);
......@@ -28,9 +29,11 @@ void dlm_unlock_recovery(struct dlm_ls *ls);
void dlm_scan_waiters(struct dlm_ls *ls);
void dlm_scan_timeout(struct dlm_ls *ls);
void dlm_adjust_timeouts(struct dlm_ls *ls);
int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len,
unsigned int flags, int *r_nodeid, int *result);
int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
unsigned int flags, struct dlm_rsb **r_ret);
struct dlm_rsb **r_ret);
void dlm_recover_purge(struct dlm_ls *ls);
void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
......
......@@ -506,20 +506,18 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock_init(&ls->ls_rsbtbl[i].lock);
}
idr_init(&ls->ls_lkbidr);
spin_lock_init(&ls->ls_lkbidr_spin);
spin_lock_init(&ls->ls_remove_spin);
size = dlm_config.ci_dirtbl_size;
ls->ls_dirtbl_size = size;
ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
if (!ls->ls_dirtbl)
goto out_lkbfree;
for (i = 0; i < size; i++) {
INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
spin_lock_init(&ls->ls_dirtbl[i].lock);
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
GFP_KERNEL);
if (!ls->ls_remove_names[i])
goto out_rsbtbl;
}
idr_init(&ls->ls_lkbidr);
spin_lock_init(&ls->ls_lkbidr_spin);
INIT_LIST_HEAD(&ls->ls_waiters);
mutex_init(&ls->ls_waiters_mutex);
INIT_LIST_HEAD(&ls->ls_orphans);
......@@ -567,7 +565,7 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
if (!ls->ls_recover_buf)
goto out_dirfree;
goto out_lkbidr;
ls->ls_slot = 0;
ls->ls_num_slots = 0;
......@@ -576,6 +574,8 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_recover_list);
spin_lock_init(&ls->ls_recover_list_lock);
idr_init(&ls->ls_recover_idr);
spin_lock_init(&ls->ls_recover_idr_lock);
ls->ls_recover_list_count = 0;
ls->ls_local_handle = ls;
init_waitqueue_head(&ls->ls_wait_general);
......@@ -647,11 +647,15 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock(&lslist_lock);
list_del(&ls->ls_list);
spin_unlock(&lslist_lock);
idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf);
out_dirfree:
vfree(ls->ls_dirtbl);
out_lkbfree:
out_lkbidr:
idr_destroy(&ls->ls_lkbidr);
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
if (ls->ls_remove_names[i])
kfree(ls->ls_remove_names[i]);
}
out_rsbtbl:
vfree(ls->ls_rsbtbl);
out_lsfree:
if (do_unreg)
......@@ -778,13 +782,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
kfree(ls->ls_recover_buf);
/*
* Free direntry structs.
*/
dlm_dir_clear(ls);
vfree(ls->ls_dirtbl);
/*
* Free all lkb's in idr
*/
......@@ -813,6 +810,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
vfree(ls->ls_rsbtbl);
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
kfree(ls->ls_remove_names[i]);
while (!list_empty(&ls->ls_new_rsb)) {
rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
res_hashchain);
......@@ -826,7 +826,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
dlm_purge_requestqueue(ls);
kfree(ls->ls_recover_args);
dlm_clear_free_entries(ls);
dlm_clear_members(ls);
dlm_clear_members_gone(ls);
kfree(ls->ls_node_array);
......
......@@ -23,8 +23,6 @@
#include "memory.h"
#include "lock.h"
#include "util.h"
#include "member.h"
static int rcom_response(struct dlm_ls *ls)
{
......@@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
int error = 0;
int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
ls->ls_recover_nodeid = nodeid;
if (nodeid == dlm_our_nodeid()) {
ls->ls_recover_buf->rc_header.h_length =
dlm_config.ci_buffer_size;
dlm_copy_master_names(ls, last_name, last_len,
ls->ls_recover_buf->rc_buf,
max_size, nodeid);
goto out;
}
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
if (error)
goto out;
......@@ -337,7 +325,26 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
rc->rc_id = (unsigned long) r;
rc->rc_id = (unsigned long) r->res_id;
send_rcom(ls, mh, rc);
out:
return error;
}
int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
struct dlm_ls *ls = r->res_ls;
int error;
error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
&rc, &mh);
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
rc->rc_id = 0xFFFFFFFF;
send_rcom(ls, mh, rc);
out:
......@@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
if (error)
return;
error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
if (rc_in->rc_id == 0xFFFFFFFF) {
log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
return;
}
error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
if (error)
ret_nodeid = error;
rc->rc_result = ret_nodeid;
......@@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
return 0;
}
/*
* Ignore messages for stage Y before we set
* recover_status bit for stage X:
*
* recover_status = 0
*
* dlm_recover_members()
* - send nothing
* - recv nothing
* - ignore NAMES, NAMES_REPLY
* - ignore LOOKUP, LOOKUP_REPLY
* - ignore LOCK, LOCK_REPLY
*
* recover_status |= NODES
*
* dlm_recover_members_wait()
*
* dlm_recover_directory()
* - send NAMES
* - recv NAMES_REPLY
* - ignore LOOKUP, LOOKUP_REPLY
* - ignore LOCK, LOCK_REPLY
*
* recover_status |= DIR
*
* dlm_recover_directory_wait()
*
* dlm_recover_masters()
* - send LOOKUP
* - recv LOOKUP_REPLY
*
* dlm_recover_locks()
* - send LOCKS
* - recv LOCKS_REPLY
*
* recover_status |= LOCKS
*
* dlm_recover_locks_wait()
*
* recover_status |= DONE
*/
/* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
int stop, reply = 0, lock = 0;
int stop, reply = 0, names = 0, lookup = 0, lock = 0;
uint32_t status;
uint64_t seq;
switch (rc->rc_type) {
case DLM_RCOM_STATUS_REPLY:
reply = 1;
break;
case DLM_RCOM_NAMES:
names = 1;
break;
case DLM_RCOM_NAMES_REPLY:
names = 1;
reply = 1;
break;
case DLM_RCOM_LOOKUP:
lookup = 1;
break;
case DLM_RCOM_LOOKUP_REPLY:
lookup = 1;
reply = 1;
break;
case DLM_RCOM_LOCK:
lock = 1;
break;
......@@ -504,10 +577,6 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
lock = 1;
reply = 1;
break;
case DLM_RCOM_STATUS_REPLY:
case DLM_RCOM_NAMES_REPLY:
case DLM_RCOM_LOOKUP_REPLY:
reply = 1;
};
spin_lock(&ls->ls_recover_lock);
......@@ -516,19 +585,17 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
(reply && (rc->rc_seq_reply != seq)) ||
(lock && !(status & DLM_RS_DIR))) {
log_limit(ls, "dlm_receive_rcom ignore msg %d "
"from %d %llu %llu recover seq %llu sts %x gen %u",
rc->rc_type,
nodeid,
(unsigned long long)rc->rc_seq,
(unsigned long long)rc->rc_seq_reply,
(unsigned long long)seq,
status, ls->ls_generation);
goto out;
}
if (stop && (rc->rc_type != DLM_RCOM_STATUS))
goto ignore;
if (reply && (rc->rc_seq_reply != seq))
goto ignore;
if (!(status & DLM_RS_NODES) && (names || lookup || lock))
goto ignore;
if (!(status & DLM_RS_DIR) && (lookup || lock))
goto ignore;
switch (rc->rc_type) {
case DLM_RCOM_STATUS:
......@@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
default:
log_error(ls, "receive_rcom bad type %d", rc->rc_type);
}
out:
return;
ignore:
log_limit(ls, "dlm_receive_rcom ignore msg %d "
"from %d %llu %llu recover seq %llu sts %x gen %u",
rc->rc_type,
nodeid,
(unsigned long long)rc->rc_seq,
(unsigned long long)rc->rc_seq_reply,
(unsigned long long)seq,
status, ls->ls_generation);
return;
Eshort:
log_error(ls, "recovery message %x from %d is too short",
rc->rc_type, nodeid);
log_error(ls, "recovery message %d from %d is too short",
rc->rc_type, nodeid);
}
......@@ -17,6 +17,7 @@
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
......
......@@ -36,30 +36,23 @@
* (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another
* function thinks it could have completed the waited-on task, they should wake
* up ls_wait_general to get an immediate response rather than waiting for the
* timer to detect the result. A timer wakes us up periodically while waiting
* to see if we should abort due to a node failure. This should only be called
* by the dlm_recoverd thread.
* timeout. This uses a timeout so it can check periodically if the wait
* should abort due to node failure (which doesn't cause a wake_up).
* This should only be called by the dlm_recoverd thread.
*/
static void dlm_wait_timer_fn(unsigned long data)
{
struct dlm_ls *ls = (struct dlm_ls *) data;
mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ));
wake_up(&ls->ls_wait_general);
}
int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
{
int error = 0;
int rv;
init_timer(&ls->ls_timer);
ls->ls_timer.function = dlm_wait_timer_fn;
ls->ls_timer.data = (long) ls;
ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ);
add_timer(&ls->ls_timer);
wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
del_timer_sync(&ls->ls_timer);
while (1) {
rv = wait_event_timeout(ls->ls_wait_general,
testfn(ls) || dlm_recovery_stopped(ls),
dlm_config.ci_recover_timer * HZ);
if (rv)
break;
}
if (dlm_recovery_stopped(ls)) {
log_debug(ls, "dlm_wait_function aborted");
......@@ -277,22 +270,6 @@ static void recover_list_del(struct dlm_rsb *r)
dlm_put_rsb(r);
}
static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
{
struct dlm_rsb *r = NULL;
spin_lock(&ls->ls_recover_list_lock);
list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
if (id == (unsigned long) r)
goto out;
}
r = NULL;
out:
spin_unlock(&ls->ls_recover_list_lock);
return r;
}
static void recover_list_clear(struct dlm_ls *ls)
{
struct dlm_rsb *r, *s;
......@@ -313,6 +290,94 @@ static void recover_list_clear(struct dlm_ls *ls)
spin_unlock(&ls->ls_recover_list_lock);
}
static int recover_idr_empty(struct dlm_ls *ls)
{
int empty = 1;
spin_lock(&ls->ls_recover_idr_lock);
if (ls->ls_recover_list_count)
empty = 0;
spin_unlock(&ls->ls_recover_idr_lock);
return empty;
}
static int recover_idr_add(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
int rv, id;
rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
if (!rv)
return -ENOMEM;
spin_lock(&ls->ls_recover_idr_lock);
if (r->res_id) {
spin_unlock(&ls->ls_recover_idr_lock);
return -1;
}
rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
if (rv) {
spin_unlock(&ls->ls_recover_idr_lock);
return rv;
}
r->res_id = id;
ls->ls_recover_list_count++;
dlm_hold_rsb(r);
spin_unlock(&ls->ls_recover_idr_lock);
return 0;
}
static void recover_idr_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
spin_lock(&ls->ls_recover_idr_lock);
idr_remove(&ls->ls_recover_idr, r->res_id);
r->res_id = 0;
ls->ls_recover_list_count--;
spin_unlock(&ls->ls_recover_idr_lock);
dlm_put_rsb(r);
}
static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
{
struct dlm_rsb *r;
spin_lock(&ls->ls_recover_idr_lock);
r = idr_find(&ls->ls_recover_idr, (int)id);
spin_unlock(&ls->ls_recover_idr_lock);
return r;
}
static int recover_idr_clear_rsb(int id, void *p, void *data)
{
struct dlm_ls *ls = data;
struct dlm_rsb *r = p;
r->res_id = 0;
r->res_recover_locks_count = 0;
ls->ls_recover_list_count--;
dlm_put_rsb(r);
return 0;
}
static void recover_idr_clear(struct dlm_ls *ls)
{
spin_lock(&ls->ls_recover_idr_lock);
idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
idr_remove_all(&ls->ls_recover_idr);
if (ls->ls_recover_list_count != 0) {
log_error(ls, "warning: recover_list_count %d",
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
spin_unlock(&ls->ls_recover_idr_lock);
}
/* Master recovery: find new master node for rsb's that were
mastered on nodes that have been removed.
......@@ -361,9 +426,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
* rsb's to consider.
*/
static void set_new_master(struct dlm_rsb *r, int nodeid)
static void set_new_master(struct dlm_rsb *r)
{
r->res_nodeid = nodeid;
set_master_lkbs(r);
rsb_set_flag(r, RSB_NEW_MASTER);
rsb_set_flag(r, RSB_NEW_MASTER2);
......@@ -372,31 +436,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
/*
* We do async lookups on rsb's that need new masters. The rsb's
* waiting for a lookup reply are kept on the recover_list.
*
* Another node recovering the master may have sent us a rcom lookup,
* and our dlm_master_lookup() set it as the new master, along with
* NEW_MASTER so that we'll recover it here (this implies dir_nodeid
* equals our_nodeid below).
*/
static int recover_master(struct dlm_rsb *r)
static int recover_master(struct dlm_rsb *r, unsigned int *count)
{
struct dlm_ls *ls = r->res_ls;
int error, ret_nodeid;
int our_nodeid = dlm_our_nodeid();
int dir_nodeid = dlm_dir_nodeid(r);
int our_nodeid, dir_nodeid;
int is_removed = 0;
int error;
if (is_master(r))
return 0;
is_removed = dlm_is_removed(ls, r->res_nodeid);
if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
return 0;
our_nodeid = dlm_our_nodeid();
dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid == our_nodeid) {
error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
r->res_length, &ret_nodeid);
if (error)
log_error(ls, "recover dir lookup error %d", error);
if (is_removed) {
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
}
if (ret_nodeid == our_nodeid)
ret_nodeid = 0;
lock_rsb(r);
set_new_master(r, ret_nodeid);
unlock_rsb(r);
/* set master of lkbs to ourself when is_removed, or to
another new master which we set along with NEW_MASTER
in dlm_master_lookup */
set_new_master(r);
error = 0;
} else {
recover_list_add(r);
recover_idr_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid);
}
(*count)++;
return error;
}
......@@ -415,7 +496,7 @@ static int recover_master(struct dlm_rsb *r)
* resent.
*/
static int recover_master_static(struct dlm_rsb *r)
static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
{
int dir_nodeid = dlm_dir_nodeid(r);
int new_master = dir_nodeid;
......@@ -423,11 +504,12 @@ static int recover_master_static(struct dlm_rsb *r)
if (dir_nodeid == dlm_our_nodeid())
new_master = 0;
lock_rsb(r);
dlm_purge_mstcpy_locks(r);
set_new_master(r, new_master);
unlock_rsb(r);
return 1;
r->res_master_nodeid = dir_nodeid;
r->res_nodeid = new_master;
set_new_master(r);
(*count)++;
return 0;
}
/*
......@@ -443,7 +525,10 @@ static int recover_master_static(struct dlm_rsb *r)
int dlm_recover_masters(struct dlm_ls *ls)
{
struct dlm_rsb *r;
int error = 0, count = 0;
unsigned int total = 0;
unsigned int count = 0;
int nodir = dlm_no_directory(ls);
int error;
log_debug(ls, "dlm_recover_masters");
......@@ -455,50 +540,58 @@ int dlm_recover_masters(struct dlm_ls *ls)
goto out;
}
if (dlm_no_directory(ls))
count += recover_master_static(r);
else if (!is_master(r) &&
(dlm_is_removed(ls, r->res_nodeid) ||
rsb_flag(r, RSB_NEW_MASTER))) {
recover_master(r);
count++;
}
lock_rsb(r);
if (nodir)
error = recover_master_static(r, &count);
else
error = recover_master(r, &count);
unlock_rsb(r);
cond_resched();
total++;
schedule();
if (error) {
up_read(&ls->ls_root_sem);
goto out;
}
}
up_read(&ls->ls_root_sem);
log_debug(ls, "dlm_recover_masters %d resources", count);
log_debug(ls, "dlm_recover_masters %u of %u", count, total);
error = dlm_wait_function(ls, &recover_list_empty);
error = dlm_wait_function(ls, &recover_idr_empty);
out:
if (error)
recover_list_clear(ls);
recover_idr_clear(ls);
return error;
}
int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
{
struct dlm_rsb *r;
int nodeid;
int ret_nodeid, new_master;
r = recover_list_find(ls, rc->rc_id);
r = recover_idr_find(ls, rc->rc_id);
if (!r) {
log_error(ls, "dlm_recover_master_reply no id %llx",
(unsigned long long)rc->rc_id);
goto out;
}
nodeid = rc->rc_result;
if (nodeid == dlm_our_nodeid())
nodeid = 0;
ret_nodeid = rc->rc_result;
if (ret_nodeid == dlm_our_nodeid())
new_master = 0;
else
new_master = ret_nodeid;
lock_rsb(r);
set_new_master(r, nodeid);
r->res_master_nodeid = ret_nodeid;
r->res_nodeid = new_master;
set_new_master(r);
unlock_rsb(r);
recover_list_del(r);
recover_idr_del(r);
if (recover_list_empty(ls))
if (recover_idr_empty(ls))
wake_up(&ls->ls_wait_general);
out:
return 0;
......@@ -711,6 +804,7 @@ static void recover_lvb(struct dlm_rsb *r)
static void recover_conversion(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
struct dlm_lkb *lkb;
int grmode = -1;
......@@ -725,10 +819,15 @@ static void recover_conversion(struct dlm_rsb *r)
list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
if (lkb->lkb_grmode != DLM_LOCK_IV)
continue;
if (grmode == -1)
if (grmode == -1) {
log_debug(ls, "recover_conversion %x set gr to rq %d",
lkb->lkb_id, lkb->lkb_rqmode);
lkb->lkb_grmode = lkb->lkb_rqmode;
else
} else {
log_debug(ls, "recover_conversion %x set gr %d",
lkb->lkb_id, grmode);
lkb->lkb_grmode = grmode;
}
}
}
......@@ -791,20 +890,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
dlm_hold_rsb(r);
}
/* If we're using a directory, add tossed rsbs to the root
list; they'll have entries created in the new directory,
but no other recovery steps should do anything with them. */
if (dlm_no_directory(ls)) {
spin_unlock(&ls->ls_rsbtbl[i].lock);
continue;
}
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
list_add(&r->res_root_list, &ls->ls_root_list);
dlm_hold_rsb(r);
}
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
log_error(ls, "dlm_create_root_list toss not empty");
spin_unlock(&ls->ls_rsbtbl[i].lock);
}
out:
......@@ -824,28 +911,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
up_write(&ls->ls_root_sem);
}
/* If not using a directory, clear the entire toss list, there's no benefit to
caching the master value since it's fixed. If we are using a dir, keep the
rsb's we're the master of. Recovery will add them to the root list and from
there they'll be entered in the rebuilt directory. */
void dlm_clear_toss_list(struct dlm_ls *ls)
void dlm_clear_toss(struct dlm_ls *ls)
{
struct rb_node *n, *next;
struct dlm_rsb *rsb;
struct dlm_rsb *r;
unsigned int count = 0;
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
next = rb_next(n);;
rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
if (dlm_no_directory(ls) || !is_master(rsb)) {
rb_erase(n, &ls->ls_rsbtbl[i].toss);
dlm_free_rsb(rsb);
}
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
rb_erase(n, &ls->ls_rsbtbl[i].toss);
dlm_free_rsb(r);
count++;
}
spin_unlock(&ls->ls_rsbtbl[i].lock);
}
if (count)
log_debug(ls, "dlm_clear_toss %u done", count);
}
......@@ -27,7 +27,7 @@ int dlm_recover_locks(struct dlm_ls *ls);
void dlm_recovered_lock(struct dlm_rsb *r);
int dlm_create_root_list(struct dlm_ls *ls);
void dlm_release_root_list(struct dlm_ls *ls);
void dlm_clear_toss_list(struct dlm_ls *ls);
void dlm_clear_toss(struct dlm_ls *ls);
void dlm_recover_rsbs(struct dlm_ls *ls);
#endif /* __RECOVER_DOT_H__ */
......
......@@ -60,12 +60,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_callback_suspend(ls);
/*
* Free non-master tossed rsb's. Master rsb's are kept on toss
* list and put on root list to be included in resdir recovery.
*/
dlm_clear_toss_list(ls);
dlm_clear_toss(ls);
/*
* This list of root rsb's will be the basis of most of the recovery
......@@ -84,6 +79,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
dlm_recover_dir_nodeid(ls);
ls->ls_recover_dir_sent_res = 0;
ls->ls_recover_dir_sent_msg = 0;
ls->ls_recover_locks_in = 0;
dlm_set_recover_status(ls, DLM_RS_NODES);
......@@ -115,6 +114,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
log_debug(ls, "dlm_recover_directory %u out %u messages",
ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
/*
* We may have outstanding operations that are waiting for a reply from
* a failed node. Mark these to be resent after recovery. Unlock and
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册