提交 b6dd1a89 编写于 作者: L Lars Ellenberg 提交者: Philipp Reisner

drbd: remove struct drbd_tl_epoch objects (barrier works)

cherry-picked and adapted from drbd 9 devel branch

DRBD requests (struct drbd_request) are already on the per resource
transfer log list, and carry their epoch number. We do not need to
additionally link them on other ring lists in other structs.

The drbd sender thread can recognize itself when to send a P_BARRIER,
by tracking the currently processed epoch, and how many writes
have been processed for that epoch.

If the epoch of the request to be processed does not match the currently
processed epoch, any writes have been processed in it, a P_BARRIER for
this last processed epoch is send out first.
The new epoch then becomes the currently processed epoch.

To not get stuck in drbd_al_begin_io() waiting for P_BARRIER_ACK,
the sender thread also needs to handle the case when the current
epoch was closed already, but no new requests are queued yet,
and send out P_BARRIER as soon as possible.

This is done by comparing the per resource "current transfer log epoch"
(tconn->current_tle_nr) with the per connection "currently processed
epoch number" (tconn->send.current_epoch_nr), while waiting for
new requests to be processed in wait_for_work().
Signed-off-by: NPhilipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: NLars Ellenberg <lars.ellenberg@linbit.com>
上级 d5b27b01
...@@ -562,12 +562,16 @@ struct drbd_request { ...@@ -562,12 +562,16 @@ struct drbd_request {
struct bio *private_bio; struct bio *private_bio;
struct drbd_interval i; struct drbd_interval i;
unsigned int epoch; /* barrier_nr */
/* barrier_nr: used to check on "completion" whether this req was in /* epoch: used to check on "completion" whether this req was in
* the current epoch, and we therefore have to close it, * the current epoch, and we therefore have to close it,
* starting a new epoch... * causing a p_barrier packet to be send, starting a new epoch.
*
* This corresponds to "barrier" in struct p_barrier[_ack],
* and to "barrier_nr" in struct drbd_epoch (and various
* comments/function parameters/local variable names).
*/ */
unsigned int epoch;
struct list_head tl_requests; /* ring list in the transfer log */ struct list_head tl_requests; /* ring list in the transfer log */
struct bio *master_bio; /* master bio pointer */ struct bio *master_bio; /* master bio pointer */
...@@ -575,14 +579,6 @@ struct drbd_request { ...@@ -575,14 +579,6 @@ struct drbd_request {
unsigned long start_time; unsigned long start_time;
}; };
struct drbd_tl_epoch {
struct drbd_work w;
struct list_head requests; /* requests before */
struct drbd_tl_epoch *next; /* pointer to the next barrier */
unsigned int br_number; /* the barriers identifier. */
int n_writes; /* number of requests attached before this barrier */
};
struct drbd_epoch { struct drbd_epoch {
struct drbd_tconn *tconn; struct drbd_tconn *tconn;
struct list_head list; struct list_head list;
...@@ -845,11 +841,8 @@ struct drbd_tconn { /* is a resource from the config file */ ...@@ -845,11 +841,8 @@ struct drbd_tconn { /* is a resource from the config file */
unsigned int ko_count; unsigned int ko_count;
spinlock_t req_lock; spinlock_t req_lock;
struct drbd_tl_epoch *unused_spare_tle; /* for pre-allocation */
struct drbd_tl_epoch *newest_tle; struct list_head transfer_log; /* all requests not yet fully processed */
struct drbd_tl_epoch *oldest_tle;
struct list_head out_of_sequence_requests;
struct list_head barrier_acked_requests;
struct crypto_hash *cram_hmac_tfm; struct crypto_hash *cram_hmac_tfm;
struct crypto_hash *integrity_tfm; /* checksums we compute, updates protected by tconn->data->mutex */ struct crypto_hash *integrity_tfm; /* checksums we compute, updates protected by tconn->data->mutex */
...@@ -859,18 +852,36 @@ struct drbd_tconn { /* is a resource from the config file */ ...@@ -859,18 +852,36 @@ struct drbd_tconn { /* is a resource from the config file */
void *int_dig_in; void *int_dig_in;
void *int_dig_vv; void *int_dig_vv;
/* receiver side */
struct drbd_epoch *current_epoch; struct drbd_epoch *current_epoch;
spinlock_t epoch_lock; spinlock_t epoch_lock;
unsigned int epochs; unsigned int epochs;
enum write_ordering_e write_ordering; enum write_ordering_e write_ordering;
atomic_t current_tle_nr; /* transfer log epoch number */ atomic_t current_tle_nr; /* transfer log epoch number */
unsigned current_tle_writes; /* writes seen within this tl epoch */
unsigned long last_reconnect_jif; unsigned long last_reconnect_jif;
struct drbd_thread receiver; struct drbd_thread receiver;
struct drbd_thread worker; struct drbd_thread worker;
struct drbd_thread asender; struct drbd_thread asender;
cpumask_var_t cpu_mask; cpumask_var_t cpu_mask;
/* sender side */
struct drbd_work_queue sender_work; struct drbd_work_queue sender_work;
struct {
/* whether this sender thread
* has processed a single write yet. */
bool seen_any_write_yet;
/* Which barrier number to send with the next P_BARRIER */
int current_epoch_nr;
/* how many write requests have been sent
* with req->epoch == current_epoch_nr.
* If none, no P_BARRIER will be sent. */
unsigned current_epoch_writes;
} send;
}; };
struct drbd_conf { struct drbd_conf {
...@@ -1054,7 +1065,6 @@ extern void drbd_calc_cpu_mask(struct drbd_tconn *tconn); ...@@ -1054,7 +1065,6 @@ extern void drbd_calc_cpu_mask(struct drbd_tconn *tconn);
extern void tl_release(struct drbd_tconn *, unsigned int barrier_nr, extern void tl_release(struct drbd_tconn *, unsigned int barrier_nr,
unsigned int set_size); unsigned int set_size);
extern void tl_clear(struct drbd_tconn *); extern void tl_clear(struct drbd_tconn *);
extern void _tl_add_barrier(struct drbd_tconn *, struct drbd_tl_epoch *);
extern void drbd_free_sock(struct drbd_tconn *tconn); extern void drbd_free_sock(struct drbd_tconn *tconn);
extern int drbd_send(struct drbd_tconn *tconn, struct socket *sock, extern int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
void *buf, size_t size, unsigned msg_flags); void *buf, size_t size, unsigned msg_flags);
...@@ -1460,7 +1470,6 @@ extern int w_resync_timer(struct drbd_work *, int); ...@@ -1460,7 +1470,6 @@ extern int w_resync_timer(struct drbd_work *, int);
extern int w_send_write_hint(struct drbd_work *, int); extern int w_send_write_hint(struct drbd_work *, int);
extern int w_make_resync_request(struct drbd_work *, int); extern int w_make_resync_request(struct drbd_work *, int);
extern int w_send_dblock(struct drbd_work *, int); extern int w_send_dblock(struct drbd_work *, int);
extern int w_send_barrier(struct drbd_work *, int);
extern int w_send_read_req(struct drbd_work *, int); extern int w_send_read_req(struct drbd_work *, int);
extern int w_prev_work_done(struct drbd_work *, int); extern int w_prev_work_done(struct drbd_work *, int);
extern int w_e_reissue(struct drbd_work *, int); extern int w_e_reissue(struct drbd_work *, int);
......
...@@ -188,147 +188,75 @@ int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins) ...@@ -188,147 +188,75 @@ int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
#endif #endif
/** /**
* DOC: The transfer log * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
* * @tconn: DRBD connection.
* The transfer log is a single linked list of &struct drbd_tl_epoch objects.
* mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
* of the list. There is always at least one &struct drbd_tl_epoch object.
*
* Each &struct drbd_tl_epoch has a circular double linked list of requests
* attached.
*/
static int tl_init(struct drbd_tconn *tconn)
{
struct drbd_tl_epoch *b;
/* during device minor initialization, we may well use GFP_KERNEL */
b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
if (!b)
return 0;
INIT_LIST_HEAD(&b->requests);
INIT_LIST_HEAD(&b->w.list);
b->next = NULL;
b->br_number = atomic_inc_return(&tconn->current_tle_nr);
b->n_writes = 0;
b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
tconn->oldest_tle = b;
tconn->newest_tle = b;
INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
INIT_LIST_HEAD(&tconn->barrier_acked_requests);
return 1;
}
static void tl_cleanup(struct drbd_tconn *tconn)
{
if (tconn->oldest_tle != tconn->newest_tle)
conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
if (!list_empty(&tconn->out_of_sequence_requests))
conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
kfree(tconn->oldest_tle);
tconn->oldest_tle = NULL;
kfree(tconn->unused_spare_tle);
tconn->unused_spare_tle = NULL;
}
/**
* _tl_add_barrier() - Adds a barrier to the transfer log
* @mdev: DRBD device.
* @new: Barrier to be added before the current head of the TL.
*
* The caller must hold the req_lock.
*/
void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
{
INIT_LIST_HEAD(&new->requests);
INIT_LIST_HEAD(&new->w.list);
new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
new->next = NULL;
new->n_writes = 0;
new->br_number = atomic_inc_return(&tconn->current_tle_nr);
if (tconn->newest_tle != new) {
tconn->newest_tle->next = new;
tconn->newest_tle = new;
}
}
/**
* tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
* @mdev: DRBD device.
* @barrier_nr: Expected identifier of the DRBD write barrier packet. * @barrier_nr: Expected identifier of the DRBD write barrier packet.
* @set_size: Expected number of requests before that barrier. * @set_size: Expected number of requests before that barrier.
* *
* In case the passed barrier_nr or set_size does not match the oldest * In case the passed barrier_nr or set_size does not match the oldest
* &struct drbd_tl_epoch objects this function will cause a termination * epoch of not yet barrier-acked requests, this function will cause a
* of the connection. * termination of the connection.
*/ */
void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr, void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
unsigned int set_size) unsigned int set_size)
{ {
struct drbd_conf *mdev;
struct drbd_tl_epoch *b, *nob; /* next old barrier */
struct list_head *le, *tle;
struct drbd_request *r; struct drbd_request *r;
struct drbd_request *req = NULL;
int expect_epoch = 0;
int expect_size = 0;
spin_lock_irq(&tconn->req_lock); spin_lock_irq(&tconn->req_lock);
b = tconn->oldest_tle; /* find latest not yet barrier-acked write request,
* count writes in its epoch. */
list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
const unsigned long s = r->rq_state;
if (!req) {
if (!(s & RQ_WRITE))
continue;
if (!(s & RQ_NET_MASK))
continue;
if (s & RQ_NET_DONE)
continue;
req = r;
expect_epoch = req->epoch;
expect_size ++;
} else {
if (r->epoch != expect_epoch)
break;
if (!(s & RQ_WRITE))
continue;
/* if (s & RQ_DONE): not expected */
/* if (!(s & RQ_NET_MASK)): not expected */
expect_size++;
}
}
/* first some paranoia code */ /* first some paranoia code */
if (b == NULL) { if (req == NULL) {
conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n", conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
barrier_nr); barrier_nr);
goto bail; goto bail;
} }
if (b->br_number != barrier_nr) { if (expect_epoch != barrier_nr) {
conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n", conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
barrier_nr, b->br_number); barrier_nr, expect_epoch);
goto bail; goto bail;
} }
if (b->n_writes != set_size) {
if (expect_size != set_size) {
conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n", conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
barrier_nr, set_size, b->n_writes); barrier_nr, set_size, expect_size);
goto bail; goto bail;
} }
/* Clean up list of requests processed during current epoch */ /* Clean up list of requests processed during current epoch */
list_for_each_safe(le, tle, &b->requests) { list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
r = list_entry(le, struct drbd_request, tl_requests); if (req->epoch != expect_epoch)
_req_mod(r, BARRIER_ACKED); break;
} _req_mod(req, BARRIER_ACKED);
/* There could be requests on the list waiting for completion
of the write to the local disk. To avoid corruptions of
slab's data structures we have to remove the lists head.
Also there could have been a barrier ack out of sequence, overtaking
the write acks - which would be a bug and violating write ordering.
To not deadlock in case we lose connection while such requests are
still pending, we need some way to find them for the
_req_mode(CONNECTION_LOST_WHILE_PENDING).
These have been list_move'd to the out_of_sequence_requests list in
_req_mod(, BARRIER_ACKED) above.
*/
list_splice_init(&b->requests, &tconn->barrier_acked_requests);
mdev = b->w.mdev;
nob = b->next;
if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
_tl_add_barrier(tconn, b);
if (nob)
tconn->oldest_tle = nob;
/* if nob == NULL b was the only barrier, and becomes the new
barrier. Therefore tconn->oldest_tle points already to b */
} else {
D_ASSERT(nob != NULL);
tconn->oldest_tle = nob;
kfree(b);
} }
spin_unlock_irq(&tconn->req_lock); spin_unlock_irq(&tconn->req_lock);
dec_ap_pending(mdev);
return; return;
...@@ -346,91 +274,20 @@ void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr, ...@@ -346,91 +274,20 @@ void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
* @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO, * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
* RESTART_FROZEN_DISK_IO. * RESTART_FROZEN_DISK_IO.
*/ */
/* must hold resource->req_lock */
void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
{ {
struct drbd_tl_epoch *b, *tmp, **pn; struct drbd_request *req, *r;
struct list_head *le, *tle, carry_reads;
struct drbd_request *req; list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests)
int rv, n_writes, n_reads; _req_mod(req, what);
}
b = tconn->oldest_tle;
pn = &tconn->oldest_tle; void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
while (b) { {
n_writes = 0; spin_lock_irq(&tconn->req_lock);
n_reads = 0; _tl_restart(tconn, what);
INIT_LIST_HEAD(&carry_reads); spin_unlock_irq(&tconn->req_lock);
list_for_each_safe(le, tle, &b->requests) {
req = list_entry(le, struct drbd_request, tl_requests);
rv = _req_mod(req, what);
if (rv & MR_WRITE)
n_writes++;
if (rv & MR_READ)
n_reads++;
}
tmp = b->next;
if (n_writes) {
if (what == RESEND) {
b->n_writes = n_writes;
if (b->w.cb == NULL) {
b->w.cb = w_send_barrier;
inc_ap_pending(b->w.mdev);
set_bit(CREATE_BARRIER, &tconn->flags);
}
drbd_queue_work(&tconn->sender_work, &b->w);
}
pn = &b->next;
} else {
if (n_reads)
list_add(&carry_reads, &b->requests);
/* there could still be requests on that ring list,
* in case local io is still pending */
list_del(&b->requests);
/* dec_ap_pending corresponding to queue_barrier.
* the newest barrier may not have been queued yet,
* in which case w.cb is still NULL. */
if (b->w.cb != NULL)
dec_ap_pending(b->w.mdev);
if (b == tconn->newest_tle) {
/* recycle, but reinit! */
if (tmp != NULL)
conn_err(tconn, "ASSERT FAILED tmp == NULL");
INIT_LIST_HEAD(&b->requests);
list_splice(&carry_reads, &b->requests);
INIT_LIST_HEAD(&b->w.list);
b->w.cb = NULL;
b->br_number = atomic_inc_return(&tconn->current_tle_nr);
b->n_writes = 0;
*pn = b;
break;
}
*pn = tmp;
kfree(b);
}
b = tmp;
list_splice(&carry_reads, &b->requests);
}
/* Actions operating on the disk state, also want to work on
requests that got barrier acked. */
switch (what) {
case FAIL_FROZEN_DISK_IO:
case RESTART_FROZEN_DISK_IO:
list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
req = list_entry(le, struct drbd_request, tl_requests);
_req_mod(req, what);
}
case CONNECTION_LOST_WHILE_PENDING:
case RESEND:
break;
default:
conn_err(tconn, "what = %d in _tl_restart()\n", what);
}
} }
/** /**
...@@ -443,36 +300,7 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) ...@@ -443,36 +300,7 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
*/ */
void tl_clear(struct drbd_tconn *tconn) void tl_clear(struct drbd_tconn *tconn)
{ {
struct list_head *le, *tle; tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
struct drbd_request *r;
spin_lock_irq(&tconn->req_lock);
_tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
/* we expect this list to be empty. */
if (!list_empty(&tconn->out_of_sequence_requests))
conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
/* but just in case, clean it up anyways! */
list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
r = list_entry(le, struct drbd_request, tl_requests);
/* It would be nice to complete outside of spinlock.
* But this is easier for now. */
_req_mod(r, CONNECTION_LOST_WHILE_PENDING);
}
/* ensure bit indicating barrier is required is clear */
clear_bit(CREATE_BARRIER, &tconn->flags);
spin_unlock_irq(&tconn->req_lock);
}
void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
{
spin_lock_irq(&tconn->req_lock);
_tl_restart(tconn, what);
spin_unlock_irq(&tconn->req_lock);
} }
/** /**
...@@ -482,31 +310,16 @@ void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) ...@@ -482,31 +310,16 @@ void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
void tl_abort_disk_io(struct drbd_conf *mdev) void tl_abort_disk_io(struct drbd_conf *mdev)
{ {
struct drbd_tconn *tconn = mdev->tconn; struct drbd_tconn *tconn = mdev->tconn;
struct drbd_tl_epoch *b; struct drbd_request *req, *r;
struct list_head *le, *tle;
struct drbd_request *req;
spin_lock_irq(&tconn->req_lock); spin_lock_irq(&tconn->req_lock);
b = tconn->oldest_tle; list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
while (b) {
list_for_each_safe(le, tle, &b->requests) {
req = list_entry(le, struct drbd_request, tl_requests);
if (!(req->rq_state & RQ_LOCAL_PENDING))
continue;
if (req->w.mdev == mdev)
_req_mod(req, ABORT_DISK_IO);
}
b = b->next;
}
list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
req = list_entry(le, struct drbd_request, tl_requests);
if (!(req->rq_state & RQ_LOCAL_PENDING)) if (!(req->rq_state & RQ_LOCAL_PENDING))
continue; continue;
if (req->w.mdev == mdev) if (req->w.mdev != mdev)
_req_mod(req, ABORT_DISK_IO); continue;
_req_mod(req, ABORT_DISK_IO);
} }
spin_unlock_irq(&tconn->req_lock); spin_unlock_irq(&tconn->req_lock);
} }
...@@ -2680,17 +2493,21 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts) ...@@ -2680,17 +2493,21 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
if (set_resource_options(tconn, res_opts)) if (set_resource_options(tconn, res_opts))
goto fail; goto fail;
if (!tl_init(tconn))
goto fail;
tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL); tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
if (!tconn->current_epoch) if (!tconn->current_epoch)
goto fail; goto fail;
INIT_LIST_HEAD(&tconn->transfer_log);
INIT_LIST_HEAD(&tconn->current_epoch->list); INIT_LIST_HEAD(&tconn->current_epoch->list);
tconn->epochs = 1; tconn->epochs = 1;
spin_lock_init(&tconn->epoch_lock); spin_lock_init(&tconn->epoch_lock);
tconn->write_ordering = WO_bdev_flush; tconn->write_ordering = WO_bdev_flush;
tconn->send.seen_any_write_yet = false;
tconn->send.current_epoch_nr = 0;
tconn->send.current_epoch_writes = 0;
tconn->cstate = C_STANDALONE; tconn->cstate = C_STANDALONE;
mutex_init(&tconn->cstate_mutex); mutex_init(&tconn->cstate_mutex);
spin_lock_init(&tconn->req_lock); spin_lock_init(&tconn->req_lock);
...@@ -2713,7 +2530,6 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts) ...@@ -2713,7 +2530,6 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
fail: fail:
kfree(tconn->current_epoch); kfree(tconn->current_epoch);
tl_cleanup(tconn);
free_cpumask_var(tconn->cpu_mask); free_cpumask_var(tconn->cpu_mask);
drbd_free_socket(&tconn->meta); drbd_free_socket(&tconn->meta);
drbd_free_socket(&tconn->data); drbd_free_socket(&tconn->data);
......
...@@ -622,6 +622,8 @@ drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force) ...@@ -622,6 +622,8 @@ drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
/* Wait until nothing is on the fly :) */ /* Wait until nothing is on the fly :) */
wait_event(mdev->misc_wait, atomic_read(&mdev->ap_pending_cnt) == 0); wait_event(mdev->misc_wait, atomic_read(&mdev->ap_pending_cnt) == 0);
/* FIXME also wait for all pending P_BARRIER_ACK? */
if (new_role == R_SECONDARY) { if (new_role == R_SECONDARY) {
set_disk_ro(mdev->vdisk, true); set_disk_ro(mdev->vdisk, true);
if (get_ldev(mdev)) { if (get_ldev(mdev)) {
...@@ -1436,6 +1438,12 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) ...@@ -1436,6 +1438,12 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
drbd_suspend_io(mdev); drbd_suspend_io(mdev);
/* also wait for the last barrier ack. */ /* also wait for the last barrier ack. */
/* FIXME see also https://daiquiri.linbit/cgi-bin/bugzilla/show_bug.cgi?id=171
* We need a way to either ignore barrier acks for barriers sent before a device
* was attached, or a way to wait for all pending barrier acks to come in.
* As barriers are counted per resource,
* we'd need to suspend io on all devices of a resource.
*/
wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_pending_cnt) || drbd_suspended(mdev)); wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_pending_cnt) || drbd_suspended(mdev));
/* and for any other previously queued work */ /* and for any other previously queued work */
drbd_flush_workqueue(mdev); drbd_flush_workqueue(mdev);
......
...@@ -4451,6 +4451,7 @@ static void conn_disconnect(struct drbd_tconn *tconn) ...@@ -4451,6 +4451,7 @@ static void conn_disconnect(struct drbd_tconn *tconn)
conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n"); conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
/* ok, no more ee's on the fly, it is safe to reset the epoch_size */ /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
atomic_set(&tconn->current_epoch->epoch_size, 0); atomic_set(&tconn->current_epoch->epoch_size, 0);
tconn->send.seen_any_write_yet = false;
conn_info(tconn, "Connection closed\n"); conn_info(tconn, "Connection closed\n");
......
...@@ -149,46 +149,16 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const ...@@ -149,46 +149,16 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
drbd_req_free(req); drbd_req_free(req);
} }
static void queue_barrier(struct drbd_conf *mdev) static void wake_all_senders(struct drbd_tconn *tconn) {
{ wake_up(&tconn->sender_work.q_wait);
struct drbd_tl_epoch *b;
struct drbd_tconn *tconn = mdev->tconn;
/* We are within the req_lock. Once we queued the barrier for sending,
* we set the CREATE_BARRIER bit. It is cleared as soon as a new
* barrier/epoch object is added. This is the only place this bit is
* set. It indicates that the barrier for this epoch is already queued,
* and no new epoch has been created yet. */
if (test_bit(CREATE_BARRIER, &tconn->flags))
return;
b = tconn->newest_tle;
b->w.cb = w_send_barrier;
b->w.mdev = mdev;
/* inc_ap_pending done here, so we won't
* get imbalanced on connection loss.
* dec_ap_pending will be done in got_BarrierAck
* or (on connection loss) in tl_clear. */
inc_ap_pending(mdev);
drbd_queue_work(&tconn->sender_work, &b->w);
set_bit(CREATE_BARRIER, &tconn->flags);
} }
static void _about_to_complete_local_write(struct drbd_conf *mdev, /* must hold resource->req_lock */
struct drbd_request *req) static void start_new_tl_epoch(struct drbd_tconn *tconn)
{ {
const unsigned long s = req->rq_state; tconn->current_tle_writes = 0;
atomic_inc(&tconn->current_tle_nr);
/* Before we can signal completion to the upper layers, wake_all_senders(tconn);
* we may need to close the current epoch.
* We can skip this, if this request has not even been sent, because we
* did not have a fully established connection yet/anymore, during
* bitmap exchange, or while we are C_AHEAD due to congestion policy.
*/
if (mdev->state.conn >= C_CONNECTED &&
(s & RQ_NET_SENT) != 0 &&
req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
queue_barrier(mdev);
} }
void complete_master_bio(struct drbd_conf *mdev, void complete_master_bio(struct drbd_conf *mdev,
...@@ -320,9 +290,16 @@ void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m) ...@@ -320,9 +290,16 @@ void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m)
} else if (!(s & RQ_POSTPONED)) } else if (!(s & RQ_POSTPONED))
D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0); D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
/* for writes we need to do some extra housekeeping */ /* Before we can signal completion to the upper layers,
if (rw == WRITE) * we may need to close the current transfer log epoch.
_about_to_complete_local_write(mdev, req); * We are within the request lock, so we can simply compare
* the request epoch number with the current transfer log
* epoch number. If they match, increase the current_tle_nr,
* and reset the transfer log epoch write_cnt.
*/
if (rw == WRITE &&
req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
start_new_tl_epoch(mdev->tconn);
/* Update disk stats */ /* Update disk stats */
_drbd_end_io_acct(mdev, req); _drbd_end_io_acct(mdev, req);
...@@ -514,15 +491,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, ...@@ -514,15 +491,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* hurting performance. */ * hurting performance. */
set_bit(UNPLUG_REMOTE, &mdev->flags); set_bit(UNPLUG_REMOTE, &mdev->flags);
/* see __drbd_make_request,
* just after it grabs the req_lock */
D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0);
req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
/* increment size of current epoch */
mdev->tconn->newest_tle->n_writes++;
/* queue work item to send data */ /* queue work item to send data */
D_ASSERT(req->rq_state & RQ_NET_PENDING); D_ASSERT(req->rq_state & RQ_NET_PENDING);
req->rq_state |= RQ_NET_QUEUED; req->rq_state |= RQ_NET_QUEUED;
...@@ -534,8 +502,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, ...@@ -534,8 +502,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
nc = rcu_dereference(mdev->tconn->net_conf); nc = rcu_dereference(mdev->tconn->net_conf);
p = nc->max_epoch_size; p = nc->max_epoch_size;
rcu_read_unlock(); rcu_read_unlock();
if (mdev->tconn->newest_tle->n_writes >= p) if (mdev->tconn->current_tle_writes >= p)
queue_barrier(mdev); start_new_tl_epoch(mdev->tconn);
break; break;
...@@ -692,6 +660,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, ...@@ -692,6 +660,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
During connection handshake, we ensure that the peer was not rebooted. */ During connection handshake, we ensure that the peer was not rebooted. */
if (!(req->rq_state & RQ_NET_OK)) { if (!(req->rq_state & RQ_NET_OK)) {
if (req->w.cb) { if (req->w.cb) {
/* w.cb expected to be w_send_dblock, or w_send_read_req */
drbd_queue_work(&mdev->tconn->sender_work, &req->w); drbd_queue_work(&mdev->tconn->sender_work, &req->w);
rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ; rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
} }
...@@ -708,7 +677,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, ...@@ -708,7 +677,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* this is bad, because if the connection is lost now, * this is bad, because if the connection is lost now,
* we won't be able to clean them up... */ * we won't be able to clean them up... */
dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n"); dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
} }
if ((req->rq_state & RQ_NET_MASK) != 0) { if ((req->rq_state & RQ_NET_MASK) != 0) {
req->rq_state |= RQ_NET_DONE; req->rq_state |= RQ_NET_DONE;
...@@ -835,7 +803,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s ...@@ -835,7 +803,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
const int rw = bio_rw(bio); const int rw = bio_rw(bio);
const int size = bio->bi_size; const int size = bio->bi_size;
const sector_t sector = bio->bi_sector; const sector_t sector = bio->bi_sector;
struct drbd_tl_epoch *b = NULL;
struct drbd_request *req; struct drbd_request *req;
struct net_conf *nc; struct net_conf *nc;
int local, remote, send_oos = 0; int local, remote, send_oos = 0;
...@@ -916,24 +883,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s ...@@ -916,24 +883,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
goto fail_free_complete; goto fail_free_complete;
} }
/* For WRITE request, we have to make sure that we have an
* unused_spare_tle, in case we need to start a new epoch.
* I try to be smart and avoid to pre-allocate always "just in case",
* but there is a race between testing the bit and pointer outside the
* spinlock, and grabbing the spinlock.
* if we lost that race, we retry. */
if (rw == WRITE && (remote || send_oos) &&
mdev->tconn->unused_spare_tle == NULL &&
test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
allocate_barrier:
b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
if (!b) {
dev_err(DEV, "Failed to alloc barrier.\n");
err = -ENOMEM;
goto fail_free_complete;
}
}
/* GOOD, everything prepared, grab the spin_lock */ /* GOOD, everything prepared, grab the spin_lock */
spin_lock_irq(&mdev->tconn->req_lock); spin_lock_irq(&mdev->tconn->req_lock);
...@@ -969,42 +918,9 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s ...@@ -969,42 +918,9 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
} }
} }
if (b && mdev->tconn->unused_spare_tle == NULL) {
mdev->tconn->unused_spare_tle = b;
b = NULL;
}
if (rw == WRITE && (remote || send_oos) &&
mdev->tconn->unused_spare_tle == NULL &&
test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
/* someone closed the current epoch
* while we were grabbing the spinlock */
spin_unlock_irq(&mdev->tconn->req_lock);
goto allocate_barrier;
}
/* Update disk stats */ /* Update disk stats */
_drbd_start_io_acct(mdev, req, bio); _drbd_start_io_acct(mdev, req, bio);
/* _maybe_start_new_epoch(mdev);
* If we need to generate a write barrier packet, we have to add the
* new epoch (barrier) object, and queue the barrier packet for sending,
* and queue the req's data after it _within the same lock_, otherwise
* we have race conditions were the reorder domains could be mixed up.
*
* Even read requests may start a new epoch and queue the corresponding
* barrier packet. To get the write ordering right, we only have to
* make sure that, if this is a write request and it triggered a
* barrier packet, this request is queued within the same spinlock. */
if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
_tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
mdev->tconn->unused_spare_tle = NULL;
} else {
D_ASSERT(!(remote && rw == WRITE &&
test_bit(CREATE_BARRIER, &mdev->tconn->flags)));
}
/* NOTE /* NOTE
* Actually, 'local' may be wrong here already, since we may have failed * Actually, 'local' may be wrong here already, since we may have failed
* to write to the meta data, and may become wrong anytime because of * to write to the meta data, and may become wrong anytime because of
...@@ -1025,7 +941,12 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s ...@@ -1025,7 +941,12 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
if (local) if (local)
_req_mod(req, TO_BE_SUBMITTED); _req_mod(req, TO_BE_SUBMITTED);
list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests); /* which transfer log epoch does this belong to? */
req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
if (rw == WRITE)
mdev->tconn->current_tle_writes++;
list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
/* NOTE remote first: to get the concurrent write detection right, /* NOTE remote first: to get the concurrent write detection right,
* we must register the request before start of local IO. */ * we must register the request before start of local IO. */
...@@ -1059,7 +980,9 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s ...@@ -1059,7 +980,9 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
} }
if (congested) { if (congested) {
queue_barrier(mdev); /* last barrier, after mirrored writes */ if (mdev->tconn->current_tle_writes)
/* start a new epoch for non-mirrored writes */
start_new_tl_epoch(mdev->tconn);
if (nc->on_congestion == OC_PULL_AHEAD) if (nc->on_congestion == OC_PULL_AHEAD)
_drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL); _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
...@@ -1070,7 +993,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s ...@@ -1070,7 +993,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
rcu_read_unlock(); rcu_read_unlock();
spin_unlock_irq(&mdev->tconn->req_lock); spin_unlock_irq(&mdev->tconn->req_lock);
kfree(b); /* if someone else has beaten us to it... */
if (local) { if (local) {
req->private_bio->bi_bdev = mdev->ldev->backing_bdev; req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
...@@ -1108,7 +1030,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s ...@@ -1108,7 +1030,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
drbd_req_free(req); drbd_req_free(req);
dec_ap_bio(mdev); dec_ap_bio(mdev);
kfree(b);
return ret; return ret;
} }
...@@ -1164,12 +1085,23 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct ...@@ -1164,12 +1085,23 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct
return limit; return limit;
} }
struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
{
/* Walk the transfer log,
* and find the oldest not yet completed request */
struct drbd_request *r;
list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
if (r->rq_state & (RQ_NET_PENDING|RQ_LOCAL_PENDING))
return r;
}
return NULL;
}
void request_timer_fn(unsigned long data) void request_timer_fn(unsigned long data)
{ {
struct drbd_conf *mdev = (struct drbd_conf *) data; struct drbd_conf *mdev = (struct drbd_conf *) data;
struct drbd_tconn *tconn = mdev->tconn; struct drbd_tconn *tconn = mdev->tconn;
struct drbd_request *req; /* oldest request */ struct drbd_request *req; /* oldest request */
struct list_head *le;
struct net_conf *nc; struct net_conf *nc;
unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */ unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
unsigned long now; unsigned long now;
...@@ -1193,16 +1125,13 @@ void request_timer_fn(unsigned long data) ...@@ -1193,16 +1125,13 @@ void request_timer_fn(unsigned long data)
now = jiffies; now = jiffies;
spin_lock_irq(&tconn->req_lock); spin_lock_irq(&tconn->req_lock);
le = &tconn->oldest_tle->requests; req = find_oldest_request(tconn);
if (list_empty(le)) { if (!req) {
spin_unlock_irq(&tconn->req_lock); spin_unlock_irq(&tconn->req_lock);
mod_timer(&mdev->request_timer, now + et); mod_timer(&mdev->request_timer, now + et);
return; return;
} }
le = le->prev;
req = list_entry(le, struct drbd_request, tl_requests);
/* The request is considered timed out, if /* The request is considered timed out, if
* - we have some effective timeout from the configuration, * - we have some effective timeout from the configuration,
* with above state restrictions applied, * with above state restrictions applied,
......
...@@ -1210,34 +1210,25 @@ int w_prev_work_done(struct drbd_work *w, int cancel) ...@@ -1210,34 +1210,25 @@ int w_prev_work_done(struct drbd_work *w, int cancel)
return 0; return 0;
} }
int w_send_barrier(struct drbd_work *w, int cancel) /* FIXME
* We need to track the number of pending barrier acks,
* and to be able to wait for them.
* See also comment in drbd_adm_attach before drbd_suspend_io.
*/
int drbd_send_barrier(struct drbd_tconn *tconn)
{ {
struct drbd_socket *sock;
struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
struct drbd_conf *mdev = w->mdev;
struct p_barrier *p; struct p_barrier *p;
struct drbd_socket *sock;
/* really avoid racing with tl_clear. w.cb may have been referenced sock = &tconn->data;
* just before it was reassigned and re-queued, so double check that. p = conn_prepare_command(tconn, sock);
* actually, this race was harmless, since we only try to send the
* barrier packet here, and otherwise do nothing with the object.
* but compare with the head of w_clear_epoch */
spin_lock_irq(&mdev->tconn->req_lock);
if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
cancel = 1;
spin_unlock_irq(&mdev->tconn->req_lock);
if (cancel)
return 0;
sock = &mdev->tconn->data;
p = drbd_prepare_command(mdev, sock);
if (!p) if (!p)
return -EIO; return -EIO;
p->barrier = b->br_number; p->barrier = tconn->send.current_epoch_nr;
/* inc_ap_pending was done where this was queued. p->pad = 0;
* dec_ap_pending will be done in got_BarrierAck tconn->send.current_epoch_writes = 0;
* or (on connection loss) in w_clear_epoch. */
return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0); return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
} }
int w_send_write_hint(struct drbd_work *w, int cancel) int w_send_write_hint(struct drbd_work *w, int cancel)
...@@ -1257,6 +1248,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel) ...@@ -1257,6 +1248,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
{ {
struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev; struct drbd_conf *mdev = w->mdev;
struct drbd_tconn *tconn = mdev->tconn;
int err; int err;
if (unlikely(cancel)) { if (unlikely(cancel)) {
...@@ -1264,6 +1256,20 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel) ...@@ -1264,6 +1256,20 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
return 0; return 0;
} }
if (!tconn->send.seen_any_write_yet) {
tconn->send.seen_any_write_yet = true;
tconn->send.current_epoch_nr = req->epoch;
}
if (tconn->send.current_epoch_nr != req->epoch) {
if (tconn->send.current_epoch_writes)
drbd_send_barrier(tconn);
tconn->send.current_epoch_nr = req->epoch;
}
/* this time, no tconn->send.current_epoch_writes++;
* If it was sent, it was the closing barrier for the last
* replicated epoch, before we went into AHEAD mode.
* No more barriers will be sent, until we leave AHEAD mode again. */
err = drbd_send_out_of_sync(mdev, req); err = drbd_send_out_of_sync(mdev, req);
req_mod(req, OOS_HANDED_TO_NETWORK); req_mod(req, OOS_HANDED_TO_NETWORK);
...@@ -1280,6 +1286,7 @@ int w_send_dblock(struct drbd_work *w, int cancel) ...@@ -1280,6 +1286,7 @@ int w_send_dblock(struct drbd_work *w, int cancel)
{ {
struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev; struct drbd_conf *mdev = w->mdev;
struct drbd_tconn *tconn = mdev->tconn;
int err; int err;
if (unlikely(cancel)) { if (unlikely(cancel)) {
...@@ -1287,6 +1294,17 @@ int w_send_dblock(struct drbd_work *w, int cancel) ...@@ -1287,6 +1294,17 @@ int w_send_dblock(struct drbd_work *w, int cancel)
return 0; return 0;
} }
if (!tconn->send.seen_any_write_yet) {
tconn->send.seen_any_write_yet = true;
tconn->send.current_epoch_nr = req->epoch;
}
if (tconn->send.current_epoch_nr != req->epoch) {
if (tconn->send.current_epoch_writes)
drbd_send_barrier(tconn);
tconn->send.current_epoch_nr = req->epoch;
}
tconn->send.current_epoch_writes++;
err = drbd_send_dblock(mdev, req); err = drbd_send_dblock(mdev, req);
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
...@@ -1303,6 +1321,7 @@ int w_send_read_req(struct drbd_work *w, int cancel) ...@@ -1303,6 +1321,7 @@ int w_send_read_req(struct drbd_work *w, int cancel)
{ {
struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev; struct drbd_conf *mdev = w->mdev;
struct drbd_tconn *tconn = mdev->tconn;
int err; int err;
if (unlikely(cancel)) { if (unlikely(cancel)) {
...@@ -1310,6 +1329,15 @@ int w_send_read_req(struct drbd_work *w, int cancel) ...@@ -1310,6 +1329,15 @@ int w_send_read_req(struct drbd_work *w, int cancel)
return 0; return 0;
} }
/* Even read requests may close a write epoch,
* if there was any yet. */
if (tconn->send.seen_any_write_yet &&
tconn->send.current_epoch_nr != req->epoch) {
if (tconn->send.current_epoch_writes)
drbd_send_barrier(tconn);
tconn->send.current_epoch_nr = req->epoch;
}
err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size, err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
(unsigned long)req); (unsigned long)req);
...@@ -1673,6 +1701,34 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) ...@@ -1673,6 +1701,34 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
mutex_unlock(mdev->state_mutex); mutex_unlock(mdev->state_mutex);
} }
/* If the resource already closed the current epoch, but we did not
* (because we have not yet seen new requests), we should send the
* corresponding barrier now. Must be checked within the same spinlock
* that is used to check for new requests. */
bool need_to_send_barrier(struct drbd_tconn *connection)
{
if (!connection->send.seen_any_write_yet)
return false;
/* Skip barriers that do not contain any writes.
* This may happen during AHEAD mode. */
if (!connection->send.current_epoch_writes)
return false;
/* ->req_lock is held when requests are queued on
* connection->sender_work, and put into ->transfer_log.
* It is also held when ->current_tle_nr is increased.
* So either there are already new requests queued,
* and corresponding barriers will be send there.
* Or nothing new is queued yet, so the difference will be 1.
*/
if (atomic_read(&connection->current_tle_nr) !=
connection->send.current_epoch_nr + 1)
return false;
return true;
}
bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
{ {
spin_lock_irq(&queue->q_lock); spin_lock_irq(&queue->q_lock);
...@@ -1690,15 +1746,79 @@ bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_lis ...@@ -1690,15 +1746,79 @@ bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_lis
return !list_empty(work_list); return !list_empty(work_list);
} }
void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
{
DEFINE_WAIT(wait);
struct net_conf *nc;
int uncork, cork;
dequeue_work_item(&connection->sender_work, work_list);
if (!list_empty(work_list))
return;
/* Still nothing to do?
* Maybe we still need to close the current epoch,
* even if no new requests are queued yet.
*
* Also, poke TCP, just in case.
* Then wait for new work (or signal). */
rcu_read_lock();
nc = rcu_dereference(connection->net_conf);
uncork = nc ? nc->tcp_cork : 0;
rcu_read_unlock();
if (uncork) {
mutex_lock(&connection->data.mutex);
if (connection->data.socket)
drbd_tcp_uncork(connection->data.socket);
mutex_unlock(&connection->data.mutex);
}
for (;;) {
int send_barrier;
prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
spin_lock_irq(&connection->req_lock);
spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
list_splice_init(&connection->sender_work.q, work_list);
spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
if (!list_empty(work_list) || signal_pending(current)) {
spin_unlock_irq(&connection->req_lock);
break;
}
send_barrier = need_to_send_barrier(connection);
spin_unlock_irq(&connection->req_lock);
if (send_barrier) {
drbd_send_barrier(connection);
connection->send.current_epoch_nr++;
}
schedule();
/* may be woken up for other things but new work, too,
* e.g. if the current epoch got closed.
* In which case we send the barrier above. */
}
finish_wait(&connection->sender_work.q_wait, &wait);
/* someone may have changed the config while we have been waiting above. */
rcu_read_lock();
nc = rcu_dereference(connection->net_conf);
cork = nc ? nc->tcp_cork : 0;
rcu_read_unlock();
mutex_lock(&connection->data.mutex);
if (connection->data.socket) {
if (cork)
drbd_tcp_cork(connection->data.socket);
else if (!uncork)
drbd_tcp_uncork(connection->data.socket);
}
mutex_unlock(&connection->data.mutex);
}
int drbd_worker(struct drbd_thread *thi) int drbd_worker(struct drbd_thread *thi)
{ {
struct drbd_tconn *tconn = thi->tconn; struct drbd_tconn *tconn = thi->tconn;
struct drbd_work *w = NULL; struct drbd_work *w = NULL;
struct drbd_conf *mdev; struct drbd_conf *mdev;
struct net_conf *nc;
LIST_HEAD(work_list); LIST_HEAD(work_list);
int vnr; int vnr;
int cork;
while (get_t_state(thi) == RUNNING) { while (get_t_state(thi) == RUNNING) {
drbd_thread_current_set_cpu(thi); drbd_thread_current_set_cpu(thi);
...@@ -1706,29 +1826,7 @@ int drbd_worker(struct drbd_thread *thi) ...@@ -1706,29 +1826,7 @@ int drbd_worker(struct drbd_thread *thi)
/* as long as we use drbd_queue_work_front(), /* as long as we use drbd_queue_work_front(),
* we may only dequeue single work items here, not batches. */ * we may only dequeue single work items here, not batches. */
if (list_empty(&work_list)) if (list_empty(&work_list))
dequeue_work_item(&tconn->sender_work, &work_list); wait_for_work(tconn, &work_list);
/* Still nothing to do? Poke TCP, just in case,
* then wait for new work (or signal). */
if (list_empty(&work_list)) {
mutex_lock(&tconn->data.mutex);
rcu_read_lock();
nc = rcu_dereference(tconn->net_conf);
cork = nc ? nc->tcp_cork : 0;
rcu_read_unlock();
if (tconn->data.socket && cork)
drbd_tcp_uncork(tconn->data.socket);
mutex_unlock(&tconn->data.mutex);
wait_event_interruptible(tconn->sender_work.q_wait,
dequeue_work_item(&tconn->sender_work, &work_list));
mutex_lock(&tconn->data.mutex);
if (tconn->data.socket && cork)
drbd_tcp_cork(tconn->data.socket);
mutex_unlock(&tconn->data.mutex);
}
if (signal_pending(current)) { if (signal_pending(current)) {
flush_signals(current); flush_signals(current);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册