提交 8f7d89f3 编写于 作者: J Jan Kara 提交者: Theodore Ts'o

jbd2: transaction reservation support

In some cases we cannot start a transaction because of locking
constraints and passing started transaction into those places is not
handy either because we could block transaction commit for too long.
Transaction reservation is designed to solve these issues.  It
reserves a handle with given number of credits in the journal and the
handle can be later attached to the running transaction without
blocking on commit or checkpointing.  Reserved handles do not block
transaction commit in any way, they only reduce maximum size of the
running transaction (because we have to always be prepared to
accomodate request for attaching reserved handle).
Signed-off-by: NJan Kara <jack@suse.cz>
Signed-off-by: N"Theodore Ts'o" <tytso@mit.edu>
上级 f29fad72
......@@ -62,7 +62,7 @@ handle_t *__ext4_journal_start_sb(struct super_block *sb, unsigned int line,
ext4_abort(sb, "Detected aborted journal");
return ERR_PTR(-EROFS);
}
return jbd2__journal_start(journal, nblocks, GFP_NOFS, type, line);
return jbd2__journal_start(journal, nblocks, 0, GFP_NOFS, type, line);
}
int __ext4_journal_stop(const char *where, unsigned int line, handle_t *handle)
......
......@@ -523,6 +523,12 @@ void jbd2_journal_commit_transaction(journal_t *journal)
*/
jbd2_journal_switch_revoke_table(journal);
/*
* Reserved credits cannot be claimed anymore, free them
*/
atomic_sub(atomic_read(&journal->j_reserved_credits),
&commit_transaction->t_outstanding_credits);
trace_jbd2_commit_flushing(journal, commit_transaction);
stats.run.rs_flushing = jiffies;
stats.run.rs_locked = jbd2_time_diff(stats.run.rs_locked,
......
......@@ -1030,6 +1030,7 @@ static journal_t * journal_init_common (void)
init_waitqueue_head(&journal->j_wait_done_commit);
init_waitqueue_head(&journal->j_wait_commit);
init_waitqueue_head(&journal->j_wait_updates);
init_waitqueue_head(&journal->j_wait_reserved);
mutex_init(&journal->j_barrier);
mutex_init(&journal->j_checkpoint_mutex);
spin_lock_init(&journal->j_revoke_lock);
......@@ -1039,6 +1040,7 @@ static journal_t * journal_init_common (void)
journal->j_commit_interval = (HZ * JBD2_DEFAULT_MAX_COMMIT_AGE);
journal->j_min_batch_time = 0;
journal->j_max_batch_time = 15000; /* 15ms */
atomic_set(&journal->j_reserved_credits, 0);
/* The journal is marked for error until we succeed with recovery! */
journal->j_flags = JBD2_ABORT;
......
......@@ -89,7 +89,8 @@ jbd2_get_transaction(journal_t *journal, transaction_t *transaction)
transaction->t_expires = jiffies + journal->j_commit_interval;
spin_lock_init(&transaction->t_handle_lock);
atomic_set(&transaction->t_updates, 0);
atomic_set(&transaction->t_outstanding_credits, 0);
atomic_set(&transaction->t_outstanding_credits,
atomic_read(&journal->j_reserved_credits));
atomic_set(&transaction->t_handle_count, 0);
INIT_LIST_HEAD(&transaction->t_inode_list);
INIT_LIST_HEAD(&transaction->t_private_list);
......@@ -140,6 +141,112 @@ static inline void update_t_max_wait(transaction_t *transaction,
#endif
}
/*
* Wait until running transaction passes T_LOCKED state. Also starts the commit
* if needed. The function expects running transaction to exist and releases
* j_state_lock.
*/
static void wait_transaction_locked(journal_t *journal)
__releases(journal->j_state_lock)
{
DEFINE_WAIT(wait);
int need_to_start;
tid_t tid = journal->j_running_transaction->t_tid;
prepare_to_wait(&journal->j_wait_transaction_locked, &wait,
TASK_UNINTERRUPTIBLE);
need_to_start = !tid_geq(journal->j_commit_request, tid);
read_unlock(&journal->j_state_lock);
if (need_to_start)
jbd2_log_start_commit(journal, tid);
schedule();
finish_wait(&journal->j_wait_transaction_locked, &wait);
}
static void sub_reserved_credits(journal_t *journal, int blocks)
{
atomic_sub(blocks, &journal->j_reserved_credits);
wake_up(&journal->j_wait_reserved);
}
/*
* Wait until we can add credits for handle to the running transaction. Called
* with j_state_lock held for reading. Returns 0 if handle joined the running
* transaction. Returns 1 if we had to wait, j_state_lock is dropped, and
* caller must retry.
*/
static int add_transaction_credits(journal_t *journal, int blocks,
int rsv_blocks)
{
transaction_t *t = journal->j_running_transaction;
int needed;
int total = blocks + rsv_blocks;
/*
* If the current transaction is locked down for commit, wait
* for the lock to be released.
*/
if (t->t_state == T_LOCKED) {
wait_transaction_locked(journal);
return 1;
}
/*
* If there is not enough space left in the log to write all
* potential buffers requested by this operation, we need to
* stall pending a log checkpoint to free some more log space.
*/
needed = atomic_add_return(total, &t->t_outstanding_credits);
if (needed > journal->j_max_transaction_buffers) {
/*
* If the current transaction is already too large,
* then start to commit it: we can then go back and
* attach this handle to a new transaction.
*/
atomic_sub(total, &t->t_outstanding_credits);
wait_transaction_locked(journal);
return 1;
}
/*
* The commit code assumes that it can get enough log space
* without forcing a checkpoint. This is *critical* for
* correctness: a checkpoint of a buffer which is also
* associated with a committing transaction creates a deadlock,
* so commit simply cannot force through checkpoints.
*
* We must therefore ensure the necessary space in the journal
* *before* starting to dirty potentially checkpointed buffers
* in the new transaction.
*/
if (jbd2_log_space_left(journal) < jbd2_space_needed(journal)) {
atomic_sub(total, &t->t_outstanding_credits);
read_unlock(&journal->j_state_lock);
write_lock(&journal->j_state_lock);
if (jbd2_log_space_left(journal) < jbd2_space_needed(journal))
__jbd2_log_wait_for_space(journal);
write_unlock(&journal->j_state_lock);
return 1;
}
/* No reservation? We are done... */
if (!rsv_blocks)
return 0;
needed = atomic_add_return(rsv_blocks, &journal->j_reserved_credits);
/* We allow at most half of a transaction to be reserved */
if (needed > journal->j_max_transaction_buffers / 2) {
sub_reserved_credits(journal, rsv_blocks);
atomic_sub(total, &t->t_outstanding_credits);
read_unlock(&journal->j_state_lock);
wait_event(journal->j_wait_reserved,
atomic_read(&journal->j_reserved_credits) + rsv_blocks
<= journal->j_max_transaction_buffers / 2);
return 1;
}
return 0;
}
/*
* start_this_handle: Given a handle, deal with any locking or stalling
* needed to make sure that there is enough journal space for the handle
......@@ -151,18 +258,24 @@ static int start_this_handle(journal_t *journal, handle_t *handle,
gfp_t gfp_mask)
{
transaction_t *transaction, *new_transaction = NULL;
tid_t tid;
int needed, need_to_start;
int nblocks = handle->h_buffer_credits;
int blocks = handle->h_buffer_credits;
int rsv_blocks = 0;
unsigned long ts = jiffies;
if (nblocks > journal->j_max_transaction_buffers) {
/*
* 1/2 of transaction can be reserved so we can practically handle
* only 1/2 of maximum transaction size per operation
*/
if (WARN_ON(blocks > journal->j_max_transaction_buffers / 2)) {
printk(KERN_ERR "JBD2: %s wants too many credits (%d > %d)\n",
current->comm, nblocks,
journal->j_max_transaction_buffers);
current->comm, blocks,
journal->j_max_transaction_buffers / 2);
return -ENOSPC;
}
if (handle->h_rsv_handle)
rsv_blocks = handle->h_rsv_handle->h_buffer_credits;
alloc_transaction:
if (!journal->j_running_transaction) {
new_transaction = kmem_cache_zalloc(transaction_cache,
......@@ -199,8 +312,12 @@ static int start_this_handle(journal_t *journal, handle_t *handle,
return -EROFS;
}
/* Wait on the journal's transaction barrier if necessary */
if (journal->j_barrier_count) {
/*
* Wait on the journal's transaction barrier if necessary. Specifically
* we allow reserved handles to proceed because otherwise commit could
* deadlock on page writeback not being able to complete.
*/
if (!handle->h_reserved && journal->j_barrier_count) {
read_unlock(&journal->j_state_lock);
wait_event(journal->j_wait_transaction_locked,
journal->j_barrier_count == 0);
......@@ -213,7 +330,7 @@ static int start_this_handle(journal_t *journal, handle_t *handle,
goto alloc_transaction;
write_lock(&journal->j_state_lock);
if (!journal->j_running_transaction &&
!journal->j_barrier_count) {
(handle->h_reserved || !journal->j_barrier_count)) {
jbd2_get_transaction(journal, new_transaction);
new_transaction = NULL;
}
......@@ -223,75 +340,18 @@ static int start_this_handle(journal_t *journal, handle_t *handle,
transaction = journal->j_running_transaction;
/*
* If the current transaction is locked down for commit, wait for the
* lock to be released.
*/
if (transaction->t_state == T_LOCKED) {
DEFINE_WAIT(wait);
prepare_to_wait(&journal->j_wait_transaction_locked,
&wait, TASK_UNINTERRUPTIBLE);
read_unlock(&journal->j_state_lock);
schedule();
finish_wait(&journal->j_wait_transaction_locked, &wait);
goto repeat;
}
/*
* If there is not enough space left in the log to write all potential
* buffers requested by this operation, we need to stall pending a log
* checkpoint to free some more log space.
*/
needed = atomic_add_return(nblocks,
&transaction->t_outstanding_credits);
if (needed > journal->j_max_transaction_buffers) {
if (!handle->h_reserved) {
/* We may have dropped j_state_lock - restart in that case */
if (add_transaction_credits(journal, blocks, rsv_blocks))
goto repeat;
} else {
/*
* If the current transaction is already too large, then start
* to commit it: we can then go back and attach this handle to
* a new transaction.
* We have handle reserved so we are allowed to join T_LOCKED
* transaction and we don't have to check for transaction size
* and journal space.
*/
DEFINE_WAIT(wait);
jbd_debug(2, "Handle %p starting new commit...\n", handle);
atomic_sub(nblocks, &transaction->t_outstanding_credits);
prepare_to_wait(&journal->j_wait_transaction_locked, &wait,
TASK_UNINTERRUPTIBLE);
tid = transaction->t_tid;
need_to_start = !tid_geq(journal->j_commit_request, tid);
read_unlock(&journal->j_state_lock);
if (need_to_start)
jbd2_log_start_commit(journal, tid);
schedule();
finish_wait(&journal->j_wait_transaction_locked, &wait);
goto repeat;
}
/*
* The commit code assumes that it can get enough log space
* without forcing a checkpoint. This is *critical* for
* correctness: a checkpoint of a buffer which is also
* associated with a committing transaction creates a deadlock,
* so commit simply cannot force through checkpoints.
*
* We must therefore ensure the necessary space in the journal
* *before* starting to dirty potentially checkpointed buffers
* in the new transaction.
*
* The worst part is, any transaction currently committing can
* reduce the free space arbitrarily. Be careful to account for
* those buffers when checkpointing.
*/
if (jbd2_log_space_left(journal) < jbd2_space_needed(journal)) {
jbd_debug(2, "Handle %p waiting for checkpoint...\n", handle);
atomic_sub(nblocks, &transaction->t_outstanding_credits);
read_unlock(&journal->j_state_lock);
write_lock(&journal->j_state_lock);
if (jbd2_log_space_left(journal) < jbd2_space_needed(journal))
__jbd2_log_wait_for_space(journal);
write_unlock(&journal->j_state_lock);
goto repeat;
sub_reserved_credits(journal, blocks);
handle->h_reserved = 0;
}
/* OK, account for the buffers that this operation expects to
......@@ -299,12 +359,12 @@ static int start_this_handle(journal_t *journal, handle_t *handle,
*/
update_t_max_wait(transaction, ts);
handle->h_transaction = transaction;
handle->h_requested_credits = nblocks;
handle->h_requested_credits = blocks;
handle->h_start_jiffies = jiffies;
atomic_inc(&transaction->t_updates);
atomic_inc(&transaction->t_handle_count);
jbd_debug(4, "Handle %p given %d credits (total %d, free %d)\n",
handle, nblocks,
jbd_debug(4, "Handle %p given %d credits (total %d, free %lu)\n",
handle, blocks,
atomic_read(&transaction->t_outstanding_credits),
jbd2_log_space_left(journal));
read_unlock(&journal->j_state_lock);
......@@ -338,16 +398,21 @@ static handle_t *new_handle(int nblocks)
*
* We make sure that the transaction can guarantee at least nblocks of
* modified buffers in the log. We block until the log can guarantee
* that much space.
*
* This function is visible to journal users (like ext3fs), so is not
* called with the journal already locked.
* that much space. Additionally, if rsv_blocks > 0, we also create another
* handle with rsv_blocks reserved blocks in the journal. This handle is
* is stored in h_rsv_handle. It is not attached to any particular transaction
* and thus doesn't block transaction commit. If the caller uses this reserved
* handle, it has to set h_rsv_handle to NULL as otherwise jbd2_journal_stop()
* on the parent handle will dispose the reserved one. Reserved handle has to
* be converted to a normal handle using jbd2_journal_start_reserved() before
* it can be used.
*
* Return a pointer to a newly allocated handle, or an ERR_PTR() value
* on failure.
*/
handle_t *jbd2__journal_start(journal_t *journal, int nblocks, gfp_t gfp_mask,
unsigned int type, unsigned int line_no)
handle_t *jbd2__journal_start(journal_t *journal, int nblocks, int rsv_blocks,
gfp_t gfp_mask, unsigned int type,
unsigned int line_no)
{
handle_t *handle = journal_current_handle();
int err;
......@@ -364,11 +429,25 @@ handle_t *jbd2__journal_start(journal_t *journal, int nblocks, gfp_t gfp_mask,
handle = new_handle(nblocks);
if (!handle)
return ERR_PTR(-ENOMEM);
if (rsv_blocks) {
handle_t *rsv_handle;
rsv_handle = new_handle(rsv_blocks);
if (!rsv_handle) {
jbd2_free_handle(handle);
return ERR_PTR(-ENOMEM);
}
rsv_handle->h_reserved = 1;
rsv_handle->h_journal = journal;
handle->h_rsv_handle = rsv_handle;
}
current->journal_info = handle;
err = start_this_handle(journal, handle, gfp_mask);
if (err < 0) {
if (handle->h_rsv_handle)
jbd2_free_handle(handle->h_rsv_handle);
jbd2_free_handle(handle);
current->journal_info = NULL;
return ERR_PTR(err);
......@@ -385,10 +464,68 @@ EXPORT_SYMBOL(jbd2__journal_start);
handle_t *jbd2_journal_start(journal_t *journal, int nblocks)
{
return jbd2__journal_start(journal, nblocks, GFP_NOFS, 0, 0);
return jbd2__journal_start(journal, nblocks, 0, GFP_NOFS, 0, 0);
}
EXPORT_SYMBOL(jbd2_journal_start);
void jbd2_journal_free_reserved(handle_t *handle)
{
journal_t *journal = handle->h_journal;
WARN_ON(!handle->h_reserved);
sub_reserved_credits(journal, handle->h_buffer_credits);
jbd2_free_handle(handle);
}
EXPORT_SYMBOL(jbd2_journal_free_reserved);
/**
* int jbd2_journal_start_reserved(handle_t *handle) - start reserved handle
* @handle: handle to start
*
* Start handle that has been previously reserved with jbd2_journal_reserve().
* This attaches @handle to the running transaction (or creates one if there's
* not transaction running). Unlike jbd2_journal_start() this function cannot
* block on journal commit, checkpointing, or similar stuff. It can block on
* memory allocation or frozen journal though.
*
* Return 0 on success, non-zero on error - handle is freed in that case.
*/
int jbd2_journal_start_reserved(handle_t *handle, unsigned int type,
unsigned int line_no)
{
journal_t *journal = handle->h_journal;
int ret = -EIO;
if (WARN_ON(!handle->h_reserved)) {
/* Someone passed in normal handle? Just stop it. */
jbd2_journal_stop(handle);
return ret;
}
/*
* Usefulness of mixing of reserved and unreserved handles is
* questionable. So far nobody seems to need it so just error out.
*/
if (WARN_ON(current->journal_info)) {
jbd2_journal_free_reserved(handle);
return ret;
}
handle->h_journal = NULL;
current->journal_info = handle;
/*
* GFP_NOFS is here because callers are likely from writeback or
* similarly constrained call sites
*/
ret = start_this_handle(journal, handle, GFP_NOFS);
if (ret < 0) {
current->journal_info = NULL;
jbd2_journal_free_reserved(handle);
}
handle->h_type = type;
handle->h_line_no = line_no;
return ret;
}
EXPORT_SYMBOL(jbd2_journal_start_reserved);
/**
* int jbd2_journal_extend() - extend buffer credits.
......@@ -483,7 +620,8 @@ int jbd2_journal_extend(handle_t *handle, int nblocks)
* to a running handle, a call to jbd2_journal_restart will commit the
* handle's transaction so far and reattach the handle to a new
* transaction capabable of guaranteeing the requested number of
* credits.
* credits. We preserve reserved handle if there's any attached to the
* passed in handle.
*/
int jbd2__journal_restart(handle_t *handle, int nblocks, gfp_t gfp_mask)
{
......@@ -508,6 +646,10 @@ int jbd2__journal_restart(handle_t *handle, int nblocks, gfp_t gfp_mask)
spin_lock(&transaction->t_handle_lock);
atomic_sub(handle->h_buffer_credits,
&transaction->t_outstanding_credits);
if (handle->h_rsv_handle) {
sub_reserved_credits(journal,
handle->h_rsv_handle->h_buffer_credits);
}
if (atomic_dec_and_test(&transaction->t_updates))
wake_up(&journal->j_wait_updates);
spin_unlock(&transaction->t_handle_lock);
......@@ -550,6 +692,14 @@ void jbd2_journal_lock_updates(journal_t *journal)
write_lock(&journal->j_state_lock);
++journal->j_barrier_count;
/* Wait until there are no reserved handles */
if (atomic_read(&journal->j_reserved_credits)) {
write_unlock(&journal->j_state_lock);
wait_event(journal->j_wait_reserved,
atomic_read(&journal->j_reserved_credits) == 0);
write_lock(&journal->j_state_lock);
}
/* Wait until there are no running updates */
while (1) {
transaction_t *transaction = journal->j_running_transaction;
......@@ -1505,6 +1655,8 @@ int jbd2_journal_stop(handle_t *handle)
lock_map_release(&handle->h_lockdep_map);
if (handle->h_rsv_handle)
jbd2_journal_free_reserved(handle->h_rsv_handle);
jbd2_free_handle(handle);
return err;
}
......
......@@ -410,8 +410,15 @@ struct jbd2_revoke_table_s;
struct jbd2_journal_handle
{
/* Which compound transaction is this update a part of? */
transaction_t *h_transaction;
union {
/* Which compound transaction is this update a part of? */
transaction_t *h_transaction;
/* Which journal handle belongs to - used iff h_reserved set */
journal_t *h_journal;
};
/* Handle reserved for finishing the logical operation */
handle_t *h_rsv_handle;
/* Number of remaining buffers we are allowed to dirty: */
int h_buffer_credits;
......@@ -426,6 +433,7 @@ struct jbd2_journal_handle
/* Flags [no locking] */
unsigned int h_sync: 1; /* sync-on-close */
unsigned int h_jdata: 1; /* force data journaling */
unsigned int h_reserved: 1; /* handle with reserved credits */
unsigned int h_aborted: 1; /* fatal error on handle */
unsigned int h_type: 8; /* for handle statistics */
unsigned int h_line_no: 16; /* for handle statistics */
......@@ -690,6 +698,7 @@ jbd2_time_diff(unsigned long start, unsigned long end)
* @j_wait_done_commit: Wait queue for waiting for commit to complete
* @j_wait_commit: Wait queue to trigger commit
* @j_wait_updates: Wait queue to wait for updates to complete
* @j_wait_reserved: Wait queue to wait for reserved buffer credits to drop
* @j_checkpoint_mutex: Mutex for locking against concurrent checkpoints
* @j_head: Journal head - identifies the first unused block in the journal
* @j_tail: Journal tail - identifies the oldest still-used block in the
......@@ -703,6 +712,7 @@ jbd2_time_diff(unsigned long start, unsigned long end)
* journal
* @j_fs_dev: Device which holds the client fs. For internal journal this will
* be equal to j_dev
* @j_reserved_credits: Number of buffers reserved from the running transaction
* @j_maxlen: Total maximum capacity of the journal region on disk.
* @j_list_lock: Protects the buffer lists and internal buffer state.
* @j_inode: Optional inode where we store the journal. If present, all journal
......@@ -801,6 +811,9 @@ struct journal_s
/* Wait queue to wait for updates to complete */
wait_queue_head_t j_wait_updates;
/* Wait queue to wait for reserved buffer credits to drop */
wait_queue_head_t j_wait_reserved;
/* Semaphore for locking against concurrent checkpoints */
struct mutex j_checkpoint_mutex;
......@@ -855,6 +868,9 @@ struct journal_s
/* Total maximum capacity of the journal region on disk. */
unsigned int j_maxlen;
/* Number of buffers reserved from the running transaction */
atomic_t j_reserved_credits;
/*
* Protects the buffer lists and internal buffer state.
*/
......@@ -1091,10 +1107,14 @@ static inline handle_t *journal_current_handle(void)
*/
extern handle_t *jbd2_journal_start(journal_t *, int nblocks);
extern handle_t *jbd2__journal_start(journal_t *, int nblocks, gfp_t gfp_mask,
unsigned int type, unsigned int line_no);
extern handle_t *jbd2__journal_start(journal_t *, int blocks, int rsv_blocks,
gfp_t gfp_mask, unsigned int type,
unsigned int line_no);
extern int jbd2_journal_restart(handle_t *, int nblocks);
extern int jbd2__journal_restart(handle_t *, int nblocks, gfp_t gfp_mask);
extern int jbd2_journal_start_reserved(handle_t *handle,
unsigned int type, unsigned int line_no);
extern void jbd2_journal_free_reserved(handle_t *handle);
extern int jbd2_journal_extend (handle_t *, int nblocks);
extern int jbd2_journal_get_write_access(handle_t *, struct buffer_head *);
extern int jbd2_journal_get_create_access (handle_t *, struct buffer_head *);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册