提交 c844b2f5 编写于 作者: M Mathieu Desnoyers 提交者: Greg Kroah-Hartman

lttng lib: ring buffer

Signed-off-by: NMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: NGreg Kroah-Hartman <gregkh@suse.de>
上级 1b4d28b6
#ifndef _LINUX_RING_BUFFER_API_H
#define _LINUX_RING_BUFFER_API_H
/*
* linux/ringbuffer/api.h
*
* Copyright (C) 2010 - Mathieu Desnoyers "mathieu.desnoyers@efficios.com"
*
* Ring Buffer API.
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include "../../wrapper/ringbuffer/backend.h"
#include "../../wrapper/ringbuffer/frontend.h"
#include "../../wrapper/ringbuffer/vfs.h"
/*
* ring_buffer_frontend_api.h contains static inline functions that depend on
* client static inlines. Hence the inclusion of this "api" header only
* within the client.
*/
#include "../../wrapper/ringbuffer/frontend_api.h"
#endif /* _LINUX_RING_BUFFER_API_H */
#ifndef _LINUX_RING_BUFFER_BACKEND_H
#define _LINUX_RING_BUFFER_BACKEND_H
/*
* linux/ringbuffer/backend.h
*
* Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring buffer backend (API).
*
* Dual LGPL v2.1/GPL v2 license.
*
* Credits to Steven Rostedt for proposing to use an extra-subbuffer owned by
* the reader in flight recorder mode.
*/
#include <linux/types.h>
#include <linux/sched.h>
#include <linux/timer.h>
#include <linux/wait.h>
#include <linux/poll.h>
#include <linux/list.h>
#include <linux/fs.h>
#include <linux/mm.h>
/* Internal helpers */
#include "../../wrapper/ringbuffer/backend_internal.h"
#include "../../wrapper/ringbuffer/frontend_internal.h"
/* Ring buffer backend API */
/* Ring buffer backend access (read/write) */
extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb,
size_t offset, void *dest, size_t len);
extern int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
size_t offset, void __user *dest,
size_t len);
extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb,
size_t offset, void *dest, size_t len);
extern struct page **
lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb, size_t offset,
void ***virt);
/*
* Return the address where a given offset is located.
* Should be used to get the current subbuffer header pointer. Given we know
* it's never on a page boundary, it's safe to write directly to this address,
* as long as the write is never bigger than a page size.
*/
extern void *
lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
size_t offset);
extern void *
lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
size_t offset);
/**
* lib_ring_buffer_write - write data to a buffer backend
* @config : ring buffer instance configuration
* @ctx: ring buffer context. (input arguments only)
* @src : source pointer to copy from
* @len : length of data to copy
*
* This function copies "len" bytes of data from a source pointer to a buffer
* backend, at the current context offset. This is more or less a buffer
* backend-specific memcpy() operation. Calls the slow path (_ring_buffer_write)
* if copy is crossing a page boundary.
*/
static inline
void lib_ring_buffer_write(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_ctx *ctx,
const void *src, size_t len)
{
struct lib_ring_buffer_backend *bufb = &ctx->buf->backend;
struct channel_backend *chanb = &ctx->chan->backend;
size_t sbidx, index;
size_t offset = ctx->buf_offset;
ssize_t pagecpy;
struct lib_ring_buffer_backend_pages *rpages;
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
sbidx = offset >> chanb->subbuf_size_order;
index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
id = bufb->buf_wsb[sbidx].id;
sb_bindex = subbuffer_id_get_index(config, id);
rpages = bufb->array[sb_bindex];
CHAN_WARN_ON(ctx->chan,
config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, id));
if (likely(pagecpy == len))
lib_ring_buffer_do_copy(config,
rpages->p[index].virt
+ (offset & ~PAGE_MASK),
src, len);
else
_lib_ring_buffer_write(bufb, offset, src, len, 0);
ctx->buf_offset += len;
}
/**
* lib_ring_buffer_memset - write len bytes of c to a buffer backend
* @config : ring buffer instance configuration
* @bufb : ring buffer backend
* @offset : offset within the buffer
* @c : the byte to copy
* @len : number of bytes to copy
*
* This function writes "len" bytes of "c" to a buffer backend, at a specific
* offset. This is more or less a buffer backend-specific memset() operation.
* Calls the slow path (_ring_buffer_memset) if write is crossing a page
* boundary.
*/
static inline
void lib_ring_buffer_memset(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_ctx *ctx, int c, size_t len)
{
struct lib_ring_buffer_backend *bufb = &ctx->buf->backend;
struct channel_backend *chanb = &ctx->chan->backend;
size_t sbidx, index;
size_t offset = ctx->buf_offset;
ssize_t pagecpy;
struct lib_ring_buffer_backend_pages *rpages;
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
sbidx = offset >> chanb->subbuf_size_order;
index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
id = bufb->buf_wsb[sbidx].id;
sb_bindex = subbuffer_id_get_index(config, id);
rpages = bufb->array[sb_bindex];
CHAN_WARN_ON(ctx->chan,
config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, id));
if (likely(pagecpy == len))
lib_ring_buffer_do_memset(rpages->p[index].virt
+ (offset & ~PAGE_MASK),
c, len);
else
_lib_ring_buffer_memset(bufb, offset, c, len, 0);
ctx->buf_offset += len;
}
/**
* lib_ring_buffer_copy_from_user - write userspace data to a buffer backend
* @config : ring buffer instance configuration
* @ctx: ring buffer context. (input arguments only)
* @src : userspace source pointer to copy from
* @len : length of data to copy
*
* This function copies "len" bytes of data from a userspace pointer to a
* buffer backend, at the current context offset. This is more or less a buffer
* backend-specific memcpy() operation. Calls the slow path
* (_ring_buffer_write_from_user) if copy is crossing a page boundary.
*/
static inline
void lib_ring_buffer_copy_from_user(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_ctx *ctx,
const void __user *src, size_t len)
{
struct lib_ring_buffer_backend *bufb = &ctx->buf->backend;
struct channel_backend *chanb = &ctx->chan->backend;
size_t sbidx, index;
size_t offset = ctx->buf_offset;
ssize_t pagecpy;
struct lib_ring_buffer_backend_pages *rpages;
unsigned long sb_bindex, id;
unsigned long ret;
offset &= chanb->buf_size - 1;
sbidx = offset >> chanb->subbuf_size_order;
index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
id = bufb->buf_wsb[sbidx].id;
sb_bindex = subbuffer_id_get_index(config, id);
rpages = bufb->array[sb_bindex];
CHAN_WARN_ON(ctx->chan,
config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, id));
if (unlikely(!access_ok(VERIFY_READ, src, len)))
goto fill_buffer;
if (likely(pagecpy == len)) {
ret = lib_ring_buffer_do_copy_from_user(
rpages->p[index].virt + (offset & ~PAGE_MASK),
src, len);
if (unlikely(ret > 0)) {
len -= (pagecpy - ret);
offset += (pagecpy - ret);
goto fill_buffer;
}
} else {
_lib_ring_buffer_copy_from_user(bufb, offset, src, len, 0);
}
ctx->buf_offset += len;
return;
fill_buffer:
/*
* In the error path we call the slow path version to avoid
* the pollution of static inline code.
*/
_lib_ring_buffer_memset(bufb, offset, 0, len, 0);
}
/*
* This accessor counts the number of unread records in a buffer.
* It only provides a consistent value if no reads not writes are performed
* concurrently.
*/
static inline
unsigned long lib_ring_buffer_get_records_unread(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
struct lib_ring_buffer_backend *bufb = &buf->backend;
struct lib_ring_buffer_backend_pages *pages;
unsigned long records_unread = 0, sb_bindex, id;
unsigned int i;
for (i = 0; i < bufb->chan->backend.num_subbuf; i++) {
id = bufb->buf_wsb[i].id;
sb_bindex = subbuffer_id_get_index(config, id);
pages = bufb->array[sb_bindex];
records_unread += v_read(config, &pages->records_unread);
}
if (config->mode == RING_BUFFER_OVERWRITE) {
id = bufb->buf_rsb.id;
sb_bindex = subbuffer_id_get_index(config, id);
pages = bufb->array[sb_bindex];
records_unread += v_read(config, &pages->records_unread);
}
return records_unread;
}
ssize_t lib_ring_buffer_file_splice_read(struct file *in, loff_t *ppos,
struct pipe_inode_info *pipe,
size_t len, unsigned int flags);
loff_t lib_ring_buffer_no_llseek(struct file *file, loff_t offset, int origin);
#endif /* _LINUX_RING_BUFFER_BACKEND_H */
#ifndef _LINUX_RING_BUFFER_BACKEND_INTERNAL_H
#define _LINUX_RING_BUFFER_BACKEND_INTERNAL_H
/*
* linux/ringbuffer/backend_internal.h
*
* Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring buffer backend (internal helpers).
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include "../../wrapper/ringbuffer/config.h"
#include "../../wrapper/ringbuffer/backend_types.h"
#include "../../wrapper/ringbuffer/frontend_types.h"
#include <linux/string.h>
#include <linux/uaccess.h>
/* Ring buffer backend API presented to the frontend */
/* Ring buffer and channel backend create/free */
int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
struct channel_backend *chan, int cpu);
void channel_backend_unregister_notifiers(struct channel_backend *chanb);
void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb);
int channel_backend_init(struct channel_backend *chanb,
const char *name,
const struct lib_ring_buffer_config *config,
void *priv, size_t subbuf_size,
size_t num_subbuf);
void channel_backend_free(struct channel_backend *chanb);
void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb);
void channel_backend_reset(struct channel_backend *chanb);
int lib_ring_buffer_backend_init(void);
void lib_ring_buffer_backend_exit(void);
extern void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb,
size_t offset, const void *src, size_t len,
ssize_t pagecpy);
extern void _lib_ring_buffer_memset(struct lib_ring_buffer_backend *bufb,
size_t offset, int c, size_t len,
ssize_t pagecpy);
extern void _lib_ring_buffer_copy_from_user(struct lib_ring_buffer_backend *bufb,
size_t offset, const void *src,
size_t len, ssize_t pagecpy);
/*
* Subbuffer ID bits for overwrite mode. Need to fit within a single word to be
* exchanged atomically.
*
* Top half word, except lowest bit, belongs to "offset", which is used to keep
* to count the produced buffers. For overwrite mode, this provides the
* consumer with the capacity to read subbuffers in order, handling the
* situation where producers would write up to 2^15 buffers (or 2^31 for 64-bit
* systems) concurrently with a single execution of get_subbuf (between offset
* sampling and subbuffer ID exchange).
*/
#define HALF_ULONG_BITS (BITS_PER_LONG >> 1)
#define SB_ID_OFFSET_SHIFT (HALF_ULONG_BITS + 1)
#define SB_ID_OFFSET_COUNT (1UL << SB_ID_OFFSET_SHIFT)
#define SB_ID_OFFSET_MASK (~(SB_ID_OFFSET_COUNT - 1))
/*
* Lowest bit of top word half belongs to noref. Used only for overwrite mode.
*/
#define SB_ID_NOREF_SHIFT (SB_ID_OFFSET_SHIFT - 1)
#define SB_ID_NOREF_COUNT (1UL << SB_ID_NOREF_SHIFT)
#define SB_ID_NOREF_MASK SB_ID_NOREF_COUNT
/*
* In overwrite mode: lowest half of word is used for index.
* Limit of 2^16 subbuffers per buffer on 32-bit, 2^32 on 64-bit.
* In producer-consumer mode: whole word used for index.
*/
#define SB_ID_INDEX_SHIFT 0
#define SB_ID_INDEX_COUNT (1UL << SB_ID_INDEX_SHIFT)
#define SB_ID_INDEX_MASK (SB_ID_NOREF_COUNT - 1)
/*
* Construct the subbuffer id from offset, index and noref. Use only the index
* for producer-consumer mode (offset and noref are only used in overwrite
* mode).
*/
static inline
unsigned long subbuffer_id(const struct lib_ring_buffer_config *config,
unsigned long offset, unsigned long noref,
unsigned long index)
{
if (config->mode == RING_BUFFER_OVERWRITE)
return (offset << SB_ID_OFFSET_SHIFT)
| (noref << SB_ID_NOREF_SHIFT)
| index;
else
return index;
}
/*
* Compare offset with the offset contained within id. Return 1 if the offset
* bits are identical, else 0.
*/
static inline
int subbuffer_id_compare_offset(const struct lib_ring_buffer_config *config,
unsigned long id, unsigned long offset)
{
return (id & SB_ID_OFFSET_MASK) == (offset << SB_ID_OFFSET_SHIFT);
}
static inline
unsigned long subbuffer_id_get_index(const struct lib_ring_buffer_config *config,
unsigned long id)
{
if (config->mode == RING_BUFFER_OVERWRITE)
return id & SB_ID_INDEX_MASK;
else
return id;
}
static inline
unsigned long subbuffer_id_is_noref(const struct lib_ring_buffer_config *config,
unsigned long id)
{
if (config->mode == RING_BUFFER_OVERWRITE)
return !!(id & SB_ID_NOREF_MASK);
else
return 1;
}
/*
* Only used by reader on subbuffer ID it has exclusive access to. No volatile
* needed.
*/
static inline
void subbuffer_id_set_noref(const struct lib_ring_buffer_config *config,
unsigned long *id)
{
if (config->mode == RING_BUFFER_OVERWRITE)
*id |= SB_ID_NOREF_MASK;
}
static inline
void subbuffer_id_set_noref_offset(const struct lib_ring_buffer_config *config,
unsigned long *id, unsigned long offset)
{
unsigned long tmp;
if (config->mode == RING_BUFFER_OVERWRITE) {
tmp = *id;
tmp &= ~SB_ID_OFFSET_MASK;
tmp |= offset << SB_ID_OFFSET_SHIFT;
tmp |= SB_ID_NOREF_MASK;
/* Volatile store, read concurrently by readers. */
ACCESS_ONCE(*id) = tmp;
}
}
/* No volatile access, since already used locally */
static inline
void subbuffer_id_clear_noref(const struct lib_ring_buffer_config *config,
unsigned long *id)
{
if (config->mode == RING_BUFFER_OVERWRITE)
*id &= ~SB_ID_NOREF_MASK;
}
/*
* For overwrite mode, cap the number of subbuffers per buffer to:
* 2^16 on 32-bit architectures
* 2^32 on 64-bit architectures
* This is required to fit in the index part of the ID. Return 0 on success,
* -EPERM on failure.
*/
static inline
int subbuffer_id_check_index(const struct lib_ring_buffer_config *config,
unsigned long num_subbuf)
{
if (config->mode == RING_BUFFER_OVERWRITE)
return (num_subbuf > (1UL << HALF_ULONG_BITS)) ? -EPERM : 0;
else
return 0;
}
static inline
void subbuffer_count_record(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
unsigned long idx)
{
unsigned long sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
v_inc(config, &bufb->array[sb_bindex]->records_commit);
}
/*
* Reader has exclusive subbuffer access for record consumption. No need to
* perform the decrement atomically.
*/
static inline
void subbuffer_consume_record(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb)
{
unsigned long sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
CHAN_WARN_ON(bufb->chan,
!v_read(config, &bufb->array[sb_bindex]->records_unread));
/* Non-atomic decrement protected by exclusive subbuffer access */
_v_dec(config, &bufb->array[sb_bindex]->records_unread);
v_inc(config, &bufb->records_read);
}
static inline
unsigned long subbuffer_get_records_count(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
unsigned long idx)
{
unsigned long sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
return v_read(config, &bufb->array[sb_bindex]->records_commit);
}
/*
* Must be executed at subbuffer delivery when the writer has _exclusive_
* subbuffer access. See ring_buffer_check_deliver() for details.
* ring_buffer_get_records_count() must be called to get the records count
* before this function, because it resets the records_commit count.
*/
static inline
unsigned long subbuffer_count_records_overrun(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
unsigned long idx)
{
struct lib_ring_buffer_backend_pages *pages;
unsigned long overruns, sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
pages = bufb->array[sb_bindex];
overruns = v_read(config, &pages->records_unread);
v_set(config, &pages->records_unread,
v_read(config, &pages->records_commit));
v_set(config, &pages->records_commit, 0);
return overruns;
}
static inline
void subbuffer_set_data_size(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
unsigned long idx,
unsigned long data_size)
{
struct lib_ring_buffer_backend_pages *pages;
unsigned long sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
pages = bufb->array[sb_bindex];
pages->data_size = data_size;
}
static inline
unsigned long subbuffer_get_read_data_size(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb)
{
struct lib_ring_buffer_backend_pages *pages;
unsigned long sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
pages = bufb->array[sb_bindex];
return pages->data_size;
}
static inline
unsigned long subbuffer_get_data_size(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
unsigned long idx)
{
struct lib_ring_buffer_backend_pages *pages;
unsigned long sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
pages = bufb->array[sb_bindex];
return pages->data_size;
}
/**
* lib_ring_buffer_clear_noref - Clear the noref subbuffer flag, called by
* writer.
*/
static inline
void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
unsigned long idx)
{
unsigned long id, new_id;
if (config->mode != RING_BUFFER_OVERWRITE)
return;
/*
* Performing a volatile access to read the sb_pages, because we want to
* read a coherent version of the pointer and the associated noref flag.
*/
id = ACCESS_ONCE(bufb->buf_wsb[idx].id);
for (;;) {
/* This check is called on the fast path for each record. */
if (likely(!subbuffer_id_is_noref(config, id))) {
/*
* Store after load dependency ordering the writes to
* the subbuffer after load and test of the noref flag
* matches the memory barrier implied by the cmpxchg()
* in update_read_sb_index().
*/
return; /* Already writing to this buffer */
}
new_id = id;
subbuffer_id_clear_noref(config, &new_id);
new_id = cmpxchg(&bufb->buf_wsb[idx].id, id, new_id);
if (likely(new_id == id))
break;
id = new_id;
}
}
/**
* lib_ring_buffer_set_noref_offset - Set the noref subbuffer flag and offset,
* called by writer.
*/
static inline
void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
unsigned long idx, unsigned long offset)
{
if (config->mode != RING_BUFFER_OVERWRITE)
return;
/*
* Because ring_buffer_set_noref() is only called by a single thread
* (the one which updated the cc_sb value), there are no concurrent
* updates to take care of: other writers have not updated cc_sb, so
* they cannot set the noref flag, and concurrent readers cannot modify
* the pointer because the noref flag is not set yet.
* The smp_wmb() in ring_buffer_commit() takes care of ordering writes
* to the subbuffer before this set noref operation.
* subbuffer_set_noref() uses a volatile store to deal with concurrent
* readers of the noref flag.
*/
CHAN_WARN_ON(bufb->chan,
subbuffer_id_is_noref(config, bufb->buf_wsb[idx].id));
/*
* Memory barrier that ensures counter stores are ordered before set
* noref and offset.
*/
smp_mb();
subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset);
}
/**
* update_read_sb_index - Read-side subbuffer index update.
*/
static inline
int update_read_sb_index(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
struct channel_backend *chanb,
unsigned long consumed_idx,
unsigned long consumed_count)
{
unsigned long old_id, new_id;
if (config->mode == RING_BUFFER_OVERWRITE) {
/*
* Exchange the target writer subbuffer with our own unused
* subbuffer. No need to use ACCESS_ONCE() here to read the
* old_wpage, because the value read will be confirmed by the
* following cmpxchg().
*/
old_id = bufb->buf_wsb[consumed_idx].id;
if (unlikely(!subbuffer_id_is_noref(config, old_id)))
return -EAGAIN;
/*
* Make sure the offset count we are expecting matches the one
* indicated by the writer.
*/
if (unlikely(!subbuffer_id_compare_offset(config, old_id,
consumed_count)))
return -EAGAIN;
CHAN_WARN_ON(bufb->chan,
!subbuffer_id_is_noref(config, bufb->buf_rsb.id));
subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id,
consumed_count);
new_id = cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id,
bufb->buf_rsb.id);
if (unlikely(old_id != new_id))
return -EAGAIN;
bufb->buf_rsb.id = new_id;
} else {
/* No page exchange, use the writer page directly */
bufb->buf_rsb.id = bufb->buf_wsb[consumed_idx].id;
}
return 0;
}
/*
* Use the architecture-specific memcpy implementation for constant-sized
* inputs, but rely on an inline memcpy for length statically unknown.
* The function call to memcpy is just way too expensive for a fast path.
*/
#define lib_ring_buffer_do_copy(config, dest, src, len) \
do { \
size_t __len = (len); \
if (__builtin_constant_p(len)) \
memcpy(dest, src, __len); \
else \
inline_memcpy(dest, src, __len); \
} while (0)
/*
* We use __copy_from_user to copy userspace data since we already
* did the access_ok for the whole range.
*/
static inline
unsigned long lib_ring_buffer_do_copy_from_user(void *dest,
const void __user *src,
unsigned long len)
{
return __copy_from_user(dest, src, len);
}
/*
* write len bytes to dest with c
*/
static inline
void lib_ring_buffer_do_memset(char *dest, int c,
unsigned long len)
{
unsigned long i;
for (i = 0; i < len; i++)
dest[i] = c;
}
#endif /* _LINUX_RING_BUFFER_BACKEND_INTERNAL_H */
#ifndef _LINUX_RING_BUFFER_BACKEND_TYPES_H
#define _LINUX_RING_BUFFER_BACKEND_TYPES_H
/*
* linux/ringbuffer/backend_types.h
*
* Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring buffer backend (types).
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include <linux/cpumask.h>
#include <linux/types.h>
struct lib_ring_buffer_backend_page {
void *virt; /* page virtual address (cached) */
struct page *page; /* pointer to page structure */
};
struct lib_ring_buffer_backend_pages {
unsigned long mmap_offset; /* offset of the subbuffer in mmap */
union v_atomic records_commit; /* current records committed count */
union v_atomic records_unread; /* records to read */
unsigned long data_size; /* Amount of data to read from subbuf */
struct lib_ring_buffer_backend_page p[];
};
struct lib_ring_buffer_backend_subbuffer {
/* Identifier for subbuf backend pages. Exchanged atomically. */
unsigned long id; /* backend subbuffer identifier */
};
/*
* Forward declaration of frontend-specific channel and ring_buffer.
*/
struct channel;
struct lib_ring_buffer;
struct lib_ring_buffer_backend {
/* Array of ring_buffer_backend_subbuffer for writer */
struct lib_ring_buffer_backend_subbuffer *buf_wsb;
/* ring_buffer_backend_subbuffer for reader */
struct lib_ring_buffer_backend_subbuffer buf_rsb;
/*
* Pointer array of backend pages, for whole buffer.
* Indexed by ring_buffer_backend_subbuffer identifier (id) index.
*/
struct lib_ring_buffer_backend_pages **array;
unsigned int num_pages_per_subbuf;
struct channel *chan; /* Associated channel */
int cpu; /* This buffer's cpu. -1 if global. */
union v_atomic records_read; /* Number of records read */
unsigned int allocated:1; /* Bool: is buffer allocated ? */
};
struct channel_backend {
unsigned long buf_size; /* Size of the buffer */
unsigned long subbuf_size; /* Sub-buffer size */
unsigned int subbuf_size_order; /* Order of sub-buffer size */
unsigned int num_subbuf_order; /*
* Order of number of sub-buffers/buffer
* for writer.
*/
unsigned int buf_size_order; /* Order of buffer size */
int extra_reader_sb:1; /* Bool: has extra reader subbuffer */
struct lib_ring_buffer *buf; /* Channel per-cpu buffers */
unsigned long num_subbuf; /* Number of sub-buffers for writer */
u64 start_tsc; /* Channel creation TSC value */
void *priv; /* Client-specific information */
struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */
const struct lib_ring_buffer_config *config; /* Ring buffer configuration */
cpumask_var_t cpumask; /* Allocated per-cpu buffers cpumask */
char name[NAME_MAX]; /* Channel name */
};
#endif /* _LINUX_RING_BUFFER_BACKEND_TYPES_H */
#ifndef _LINUX_RING_BUFFER_CONFIG_H
#define _LINUX_RING_BUFFER_CONFIG_H
/*
* linux/ringbuffer/config.h
*
* Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring buffer configuration header. Note: after declaring the standard inline
* functions, clients should also include linux/ringbuffer/api.h.
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include <linux/types.h>
#include <linux/percpu.h>
#include "../align.h"
struct lib_ring_buffer;
struct channel;
struct lib_ring_buffer_config;
struct lib_ring_buffer_ctx;
/*
* Ring buffer client callbacks. Only used by slow path, never on fast path.
* For the fast path, record_header_size(), ring_buffer_clock_read() should be
* provided as inline functions too. These may simply return 0 if not used by
* the client.
*/
struct lib_ring_buffer_client_cb {
/* Mandatory callbacks */
/* A static inline version is also required for fast path */
u64 (*ring_buffer_clock_read) (struct channel *chan);
size_t (*record_header_size) (const struct lib_ring_buffer_config *config,
struct channel *chan, size_t offset,
size_t *pre_header_padding,
struct lib_ring_buffer_ctx *ctx);
/* Slow path only, at subbuffer switch */
size_t (*subbuffer_header_size) (void);
void (*buffer_begin) (struct lib_ring_buffer *buf, u64 tsc,
unsigned int subbuf_idx);
void (*buffer_end) (struct lib_ring_buffer *buf, u64 tsc,
unsigned int subbuf_idx, unsigned long data_size);
/* Optional callbacks (can be set to NULL) */
/* Called at buffer creation/finalize */
int (*buffer_create) (struct lib_ring_buffer *buf, void *priv,
int cpu, const char *name);
/*
* Clients should guarantee that no new reader handle can be opened
* after finalize.
*/
void (*buffer_finalize) (struct lib_ring_buffer *buf, void *priv, int cpu);
/*
* Extract header length, payload length and timestamp from event
* record. Used by buffer iterators. Timestamp is only used by channel
* iterator.
*/
void (*record_get) (const struct lib_ring_buffer_config *config,
struct channel *chan, struct lib_ring_buffer *buf,
size_t offset, size_t *header_len,
size_t *payload_len, u64 *timestamp);
};
/*
* Ring buffer instance configuration.
*
* Declare as "static const" within the client object to ensure the inline fast
* paths can be optimized.
*
* alloc/sync pairs:
*
* RING_BUFFER_ALLOC_PER_CPU and RING_BUFFER_SYNC_PER_CPU :
* Per-cpu buffers with per-cpu synchronization. Tracing must be performed
* with preemption disabled (lib_ring_buffer_get_cpu() and
* lib_ring_buffer_put_cpu()).
*
* RING_BUFFER_ALLOC_PER_CPU and RING_BUFFER_SYNC_GLOBAL :
* Per-cpu buffer with global synchronization. Tracing can be performed with
* preemption enabled, statistically stays on the local buffers.
*
* RING_BUFFER_ALLOC_GLOBAL and RING_BUFFER_SYNC_PER_CPU :
* Should only be used for buffers belonging to a single thread or protected
* by mutual exclusion by the client. Note that periodical sub-buffer switch
* should be disabled in this kind of configuration.
*
* RING_BUFFER_ALLOC_GLOBAL and RING_BUFFER_SYNC_GLOBAL :
* Global shared buffer with global synchronization.
*
* wakeup:
*
* RING_BUFFER_WAKEUP_BY_TIMER uses per-cpu deferrable timers to poll the
* buffers and wake up readers if data is ready. Mainly useful for tracers which
* don't want to call into the wakeup code on the tracing path. Use in
* combination with "read_timer_interval" channel_create() argument.
*
* RING_BUFFER_WAKEUP_BY_WRITER directly wakes up readers when a subbuffer is
* ready to read. Lower latencies before the reader is woken up. Mainly suitable
* for drivers.
*
* RING_BUFFER_WAKEUP_NONE does not perform any wakeup whatsoever. The client
* has the responsibility to perform wakeups.
*/
struct lib_ring_buffer_config {
enum {
RING_BUFFER_ALLOC_PER_CPU,
RING_BUFFER_ALLOC_GLOBAL,
} alloc;
enum {
RING_BUFFER_SYNC_PER_CPU, /* Wait-free */
RING_BUFFER_SYNC_GLOBAL, /* Lock-free */
} sync;
enum {
RING_BUFFER_OVERWRITE, /* Overwrite when buffer full */
RING_BUFFER_DISCARD, /* Discard when buffer full */
} mode;
enum {
RING_BUFFER_SPLICE,
RING_BUFFER_MMAP,
RING_BUFFER_READ, /* TODO */
RING_BUFFER_ITERATOR,
RING_BUFFER_NONE,
} output;
enum {
RING_BUFFER_PAGE,
RING_BUFFER_VMAP, /* TODO */
RING_BUFFER_STATIC, /* TODO */
} backend;
enum {
RING_BUFFER_NO_OOPS_CONSISTENCY,
RING_BUFFER_OOPS_CONSISTENCY,
} oops;
enum {
RING_BUFFER_IPI_BARRIER,
RING_BUFFER_NO_IPI_BARRIER,
} ipi;
enum {
RING_BUFFER_WAKEUP_BY_TIMER, /* wake up performed by timer */
RING_BUFFER_WAKEUP_BY_WRITER, /*
* writer wakes up reader,
* not lock-free
* (takes spinlock).
*/
} wakeup;
/*
* tsc_bits: timestamp bits saved at each record.
* 0 and 64 disable the timestamp compression scheme.
*/
unsigned int tsc_bits;
struct lib_ring_buffer_client_cb cb;
};
/*
* ring buffer context
*
* Context passed to lib_ring_buffer_reserve(), lib_ring_buffer_commit(),
* lib_ring_buffer_try_discard_reserve(), lib_ring_buffer_align_ctx() and
* lib_ring_buffer_write().
*/
struct lib_ring_buffer_ctx {
/* input received by lib_ring_buffer_reserve(), saved here. */
struct channel *chan; /* channel */
void *priv; /* client private data */
size_t data_size; /* size of payload */
int largest_align; /*
* alignment of the largest element
* in the payload
*/
int cpu; /* processor id */
/* output from lib_ring_buffer_reserve() */
struct lib_ring_buffer *buf; /*
* buffer corresponding to processor id
* for this channel
*/
size_t slot_size; /* size of the reserved slot */
unsigned long buf_offset; /* offset following the record header */
unsigned long pre_offset; /*
* Initial offset position _before_
* the record is written. Positioned
* prior to record header alignment
* padding.
*/
u64 tsc; /* time-stamp counter value */
unsigned int rflags; /* reservation flags */
};
/**
* lib_ring_buffer_ctx_init - initialize ring buffer context
* @ctx: ring buffer context to initialize
* @chan: channel
* @priv: client private data
* @data_size: size of record data payload
* @largest_align: largest alignment within data payload types
* @cpu: processor id
*/
static inline
void lib_ring_buffer_ctx_init(struct lib_ring_buffer_ctx *ctx,
struct channel *chan, void *priv,
size_t data_size, int largest_align,
int cpu)
{
ctx->chan = chan;
ctx->priv = priv;
ctx->data_size = data_size;
ctx->largest_align = largest_align;
ctx->cpu = cpu;
ctx->rflags = 0;
}
/*
* Reservation flags.
*
* RING_BUFFER_RFLAG_FULL_TSC
*
* This flag is passed to record_header_size() and to the primitive used to
* write the record header. It indicates that the full 64-bit time value is
* needed in the record header. If this flag is not set, the record header needs
* only to contain "tsc_bits" bit of time value.
*
* Reservation flags can be added by the client, starting from
* "(RING_BUFFER_FLAGS_END << 0)". It can be used to pass information from
* record_header_size() to lib_ring_buffer_write_record_header().
*/
#define RING_BUFFER_RFLAG_FULL_TSC (1U << 0)
#define RING_BUFFER_RFLAG_END (1U << 1)
/*
* We need to define RING_BUFFER_ALIGN_ATTR so it is known early at
* compile-time. We have to duplicate the "config->align" information and the
* definition here because config->align is used both in the slow and fast
* paths, but RING_BUFFER_ALIGN_ATTR is only available for the client code.
*/
#ifdef RING_BUFFER_ALIGN
# define RING_BUFFER_ALIGN_ATTR /* Default arch alignment */
/*
* Calculate the offset needed to align the type.
* size_of_type must be non-zero.
*/
static inline
unsigned int lib_ring_buffer_align(size_t align_drift, size_t size_of_type)
{
return offset_align(align_drift, size_of_type);
}
#else
# define RING_BUFFER_ALIGN_ATTR __attribute__((packed))
/*
* Calculate the offset needed to align the type.
* size_of_type must be non-zero.
*/
static inline
unsigned int lib_ring_buffer_align(size_t align_drift, size_t size_of_type)
{
return 0;
}
#endif
/**
* lib_ring_buffer_align_ctx - Align context offset on "alignment"
* @ctx: ring buffer context.
*/
static inline
void lib_ring_buffer_align_ctx(struct lib_ring_buffer_ctx *ctx,
size_t alignment)
{
ctx->buf_offset += lib_ring_buffer_align(ctx->buf_offset,
alignment);
}
/*
* lib_ring_buffer_check_config() returns 0 on success.
* Used internally to check for valid configurations at channel creation.
*/
static inline
int lib_ring_buffer_check_config(const struct lib_ring_buffer_config *config,
unsigned int switch_timer_interval,
unsigned int read_timer_interval)
{
if (config->alloc == RING_BUFFER_ALLOC_GLOBAL
&& config->sync == RING_BUFFER_SYNC_PER_CPU
&& switch_timer_interval)
return -EINVAL;
return 0;
}
#include "../../wrapper/ringbuffer/vatomic.h"
#endif /* _LINUX_RING_BUFFER_CONFIG_H */
#ifndef _LINUX_RING_BUFFER_FRONTEND_H
#define _LINUX_RING_BUFFER_FRONTEND_H
/*
* linux/ringbuffer/frontend.h
*
* (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring Buffer Library Synchronization Header (API).
*
* Author:
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* See ring_buffer_frontend.c for more information on wait-free algorithms.
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include <linux/pipe_fs_i.h>
#include <linux/rcupdate.h>
#include <linux/cpumask.h>
#include <linux/module.h>
#include <linux/bitops.h>
#include <linux/splice.h>
#include <linux/string.h>
#include <linux/timer.h>
#include <linux/sched.h>
#include <linux/cache.h>
#include <linux/time.h>
#include <linux/slab.h>
#include <linux/init.h>
#include <linux/stat.h>
#include <linux/cpu.h>
#include <linux/fs.h>
#include <asm/atomic.h>
#include <asm/local.h>
/* Internal helpers */
#include "../../wrapper/ringbuffer/frontend_internal.h"
/* Buffer creation/removal and setup operations */
/*
* switch_timer_interval is the time interval (in us) to fill sub-buffers with
* padding to let readers get those sub-buffers. Used for live streaming.
*
* read_timer_interval is the time interval (in us) to wake up pending readers.
*
* buf_addr is a pointer the the beginning of the preallocated buffer contiguous
* address mapping. It is used only by RING_BUFFER_STATIC configuration. It can
* be set to NULL for other backends.
*/
extern
struct channel *channel_create(const struct lib_ring_buffer_config *config,
const char *name, void *priv,
void *buf_addr,
size_t subbuf_size, size_t num_subbuf,
unsigned int switch_timer_interval,
unsigned int read_timer_interval);
/*
* channel_destroy returns the private data pointer. It finalizes all channel's
* buffers, waits for readers to release all references, and destroys the
* channel.
*/
extern
void *channel_destroy(struct channel *chan);
/* Buffer read operations */
/*
* Iteration on channel cpumask needs to issue a read barrier to match the write
* barrier in cpu hotplug. It orders the cpumask read before read of per-cpu
* buffer data. The per-cpu buffer is never removed by cpu hotplug; teardown is
* only performed at channel destruction.
*/
#define for_each_channel_cpu(cpu, chan) \
for ((cpu) = -1; \
({ (cpu) = cpumask_next(cpu, (chan)->backend.cpumask); \
smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });)
extern struct lib_ring_buffer *channel_get_ring_buffer(
const struct lib_ring_buffer_config *config,
struct channel *chan, int cpu);
extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf);
extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf);
/*
* Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer.
*/
extern int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
unsigned long *consumed,
unsigned long *produced);
extern void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
unsigned long consumed_new);
extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
unsigned long consumed);
extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf);
/*
* lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers
* to read sub-buffers sequentially.
*/
static inline int lib_ring_buffer_get_next_subbuf(struct lib_ring_buffer *buf)
{
int ret;
ret = lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
&buf->prod_snapshot);
if (ret)
return ret;
ret = lib_ring_buffer_get_subbuf(buf, buf->cons_snapshot);
return ret;
}
static inline void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf)
{
lib_ring_buffer_put_subbuf(buf);
lib_ring_buffer_move_consumer(buf, subbuf_align(buf->cons_snapshot,
buf->backend.chan));
}
extern void channel_reset(struct channel *chan);
extern void lib_ring_buffer_reset(struct lib_ring_buffer *buf);
static inline
unsigned long lib_ring_buffer_get_offset(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return v_read(config, &buf->offset);
}
static inline
unsigned long lib_ring_buffer_get_consumed(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return atomic_long_read(&buf->consumed);
}
/*
* Must call lib_ring_buffer_is_finalized before reading counters (memory
* ordering enforced with respect to trace teardown).
*/
static inline
int lib_ring_buffer_is_finalized(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
int finalized = ACCESS_ONCE(buf->finalized);
/*
* Read finalized before counters.
*/
smp_rmb();
return finalized;
}
static inline
int lib_ring_buffer_channel_is_finalized(const struct channel *chan)
{
return chan->finalized;
}
static inline
int lib_ring_buffer_channel_is_disabled(const struct channel *chan)
{
return atomic_read(&chan->record_disabled);
}
static inline
unsigned long lib_ring_buffer_get_read_data_size(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return subbuffer_get_read_data_size(config, &buf->backend);
}
static inline
unsigned long lib_ring_buffer_get_records_count(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return v_read(config, &buf->records_count);
}
static inline
unsigned long lib_ring_buffer_get_records_overrun(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return v_read(config, &buf->records_overrun);
}
static inline
unsigned long lib_ring_buffer_get_records_lost_full(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return v_read(config, &buf->records_lost_full);
}
static inline
unsigned long lib_ring_buffer_get_records_lost_wrap(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return v_read(config, &buf->records_lost_wrap);
}
static inline
unsigned long lib_ring_buffer_get_records_lost_big(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return v_read(config, &buf->records_lost_big);
}
static inline
unsigned long lib_ring_buffer_get_records_read(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
return v_read(config, &buf->backend.records_read);
}
#endif /* _LINUX_RING_BUFFER_FRONTEND_H */
#ifndef _LINUX_RING_BUFFER_FRONTEND_API_H
#define _LINUX_RING_BUFFER_FRONTEND_API_H
/*
* linux/ringbuffer/frontend_api.h
*
* (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring Buffer Library Synchronization Header (buffer write API).
*
* Author:
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* See ring_buffer_frontend.c for more information on wait-free algorithms.
* See linux/ringbuffer/frontend.h for channel allocation and read-side API.
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include "../../wrapper/ringbuffer/frontend.h"
#include <linux/errno.h>
/**
* lib_ring_buffer_get_cpu - Precedes ring buffer reserve/commit.
*
* Disables preemption (acts as a RCU read-side critical section) and keeps a
* ring buffer nesting count as supplementary safety net to ensure tracer client
* code will never trigger an endless recursion. Returns the processor ID on
* success, -EPERM on failure (nesting count too high).
*
* asm volatile and "memory" clobber prevent the compiler from moving
* instructions out of the ring buffer nesting count. This is required to ensure
* that probe side-effects which can cause recursion (e.g. unforeseen traps,
* divisions by 0, ...) are triggered within the incremented nesting count
* section.
*/
static inline
int lib_ring_buffer_get_cpu(const struct lib_ring_buffer_config *config)
{
int cpu, nesting;
rcu_read_lock_sched_notrace();
cpu = smp_processor_id();
nesting = ++per_cpu(lib_ring_buffer_nesting, cpu);
barrier();
if (unlikely(nesting > 4)) {
WARN_ON_ONCE(1);
per_cpu(lib_ring_buffer_nesting, cpu)--;
rcu_read_unlock_sched_notrace();
return -EPERM;
} else
return cpu;
}
/**
* lib_ring_buffer_put_cpu - Follows ring buffer reserve/commit.
*/
static inline
void lib_ring_buffer_put_cpu(const struct lib_ring_buffer_config *config)
{
barrier();
__get_cpu_var(lib_ring_buffer_nesting)--;
rcu_read_unlock_sched_notrace();
}
/*
* lib_ring_buffer_try_reserve is called by lib_ring_buffer_reserve(). It is not
* part of the API per se.
*
* returns 0 if reserve ok, or 1 if the slow path must be taken.
*/
static inline
int lib_ring_buffer_try_reserve(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_ctx *ctx,
unsigned long *o_begin, unsigned long *o_end,
unsigned long *o_old, size_t *before_hdr_pad)
{
struct channel *chan = ctx->chan;
struct lib_ring_buffer *buf = ctx->buf;
*o_begin = v_read(config, &buf->offset);
*o_old = *o_begin;
ctx->tsc = lib_ring_buffer_clock_read(chan);
if ((int64_t) ctx->tsc == -EIO)
return 1;
/*
* Prefetch cacheline for read because we have to read the previous
* commit counter to increment it and commit seq value to compare it to
* the commit counter.
*/
prefetch(&buf->commit_hot[subbuf_index(*o_begin, chan)]);
if (last_tsc_overflow(config, buf, ctx->tsc))
ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
if (unlikely(subbuf_offset(*o_begin, chan) == 0))
return 1;
ctx->slot_size = record_header_size(config, chan, *o_begin,
before_hdr_pad, ctx);
ctx->slot_size +=
lib_ring_buffer_align(*o_begin + ctx->slot_size,
ctx->largest_align) + ctx->data_size;
if (unlikely((subbuf_offset(*o_begin, chan) + ctx->slot_size)
> chan->backend.subbuf_size))
return 1;
/*
* Record fits in the current buffer and we are not on a switch
* boundary. It's safe to write.
*/
*o_end = *o_begin + ctx->slot_size;
if (unlikely((subbuf_offset(*o_end, chan)) == 0))
/*
* The offset_end will fall at the very beginning of the next
* subbuffer.
*/
return 1;
return 0;
}
/**
* lib_ring_buffer_reserve - Reserve space in a ring buffer.
* @config: ring buffer instance configuration.
* @ctx: ring buffer context. (input and output) Must be already initialized.
*
* Atomic wait-free slot reservation. The reserved space starts at the context
* "pre_offset". Its length is "slot_size". The associated time-stamp is "tsc".
*
* Return :
* 0 on success.
* -EAGAIN if channel is disabled.
* -ENOSPC if event size is too large for packet.
* -ENOBUFS if there is currently not enough space in buffer for the event.
* -EIO if data cannot be written into the buffer for any other reason.
*/
static inline
int lib_ring_buffer_reserve(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_ctx *ctx)
{
struct channel *chan = ctx->chan;
struct lib_ring_buffer *buf;
unsigned long o_begin, o_end, o_old;
size_t before_hdr_pad = 0;
if (atomic_read(&chan->record_disabled))
return -EAGAIN;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
else
buf = chan->backend.buf;
if (atomic_read(&buf->record_disabled))
return -EAGAIN;
ctx->buf = buf;
/*
* Perform retryable operations.
*/
if (unlikely(lib_ring_buffer_try_reserve(config, ctx, &o_begin,
&o_end, &o_old, &before_hdr_pad)))
goto slow_path;
if (unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, o_end)
!= o_old))
goto slow_path;
/*
* Atomically update last_tsc. This update races against concurrent
* atomic updates, but the race will always cause supplementary full TSC
* record headers, never the opposite (missing a full TSC record header
* when it would be needed).
*/
save_last_tsc(config, ctx->buf, ctx->tsc);
/*
* Push the reader if necessary
*/
lib_ring_buffer_reserve_push_reader(ctx->buf, chan, o_end - 1);
/*
* Clear noref flag for this subbuffer.
*/
lib_ring_buffer_clear_noref(config, &ctx->buf->backend,
subbuf_index(o_end - 1, chan));
ctx->pre_offset = o_begin;
ctx->buf_offset = o_begin + before_hdr_pad;
return 0;
slow_path:
return lib_ring_buffer_reserve_slow(ctx);
}
/**
* lib_ring_buffer_switch - Perform a sub-buffer switch for a per-cpu buffer.
* @config: ring buffer instance configuration.
* @buf: buffer
* @mode: buffer switch mode (SWITCH_ACTIVE or SWITCH_FLUSH)
*
* This operation is completely reentrant : can be called while tracing is
* active with absolutely no lock held.
*
* Note, however, that as a v_cmpxchg is used for some atomic operations and
* requires to be executed locally for per-CPU buffers, this function must be
* called from the CPU which owns the buffer for a ACTIVE flush, with preemption
* disabled, for RING_BUFFER_SYNC_PER_CPU configuration.
*/
static inline
void lib_ring_buffer_switch(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf, enum switch_mode mode)
{
lib_ring_buffer_switch_slow(buf, mode);
}
/* See ring_buffer_frontend_api.h for lib_ring_buffer_reserve(). */
/**
* lib_ring_buffer_commit - Commit an record.
* @config: ring buffer instance configuration.
* @ctx: ring buffer context. (input arguments only)
*
* Atomic unordered slot commit. Increments the commit count in the
* specified sub-buffer, and delivers it if necessary.
*/
static inline
void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config,
const struct lib_ring_buffer_ctx *ctx)
{
struct channel *chan = ctx->chan;
struct lib_ring_buffer *buf = ctx->buf;
unsigned long offset_end = ctx->buf_offset;
unsigned long endidx = subbuf_index(offset_end - 1, chan);
unsigned long commit_count;
/*
* Must count record before incrementing the commit count.
*/
subbuffer_count_record(config, &buf->backend, endidx);
/*
* Order all writes to buffer before the commit count update that will
* determine that the subbuffer is full.
*/
if (config->ipi == RING_BUFFER_IPI_BARRIER) {
/*
* Must write slot data before incrementing commit count. This
* compiler barrier is upgraded into a smp_mb() by the IPI sent
* by get_subbuf().
*/
barrier();
} else
smp_wmb();
v_add(config, ctx->slot_size, &buf->commit_hot[endidx].cc);
/*
* commit count read can race with concurrent OOO commit count updates.
* This is only needed for lib_ring_buffer_check_deliver (for
* non-polling delivery only) and for
* lib_ring_buffer_write_commit_counter. The race can only cause the
* counter to be read with the same value more than once, which could
* cause :
* - Multiple delivery for the same sub-buffer (which is handled
* gracefully by the reader code) if the value is for a full
* sub-buffer. It's important that we can never miss a sub-buffer
* delivery. Re-reading the value after the v_add ensures this.
* - Reading a commit_count with a higher value that what was actually
* added to it for the lib_ring_buffer_write_commit_counter call
* (again caused by a concurrent committer). It does not matter,
* because this function is interested in the fact that the commit
* count reaches back the reserve offset for a specific sub-buffer,
* which is completely independent of the order.
*/
commit_count = v_read(config, &buf->commit_hot[endidx].cc);
lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
commit_count, endidx);
/*
* Update used size at each commit. It's needed only for extracting
* ring_buffer buffers from vmcore, after crash.
*/
lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
ctx->buf_offset, commit_count,
ctx->slot_size);
}
/**
* lib_ring_buffer_try_discard_reserve - Try discarding a record.
* @config: ring buffer instance configuration.
* @ctx: ring buffer context. (input arguments only)
*
* Only succeeds if no other record has been written after the record to
* discard. If discard fails, the record must be committed to the buffer.
*
* Returns 0 upon success, -EPERM if the record cannot be discarded.
*/
static inline
int lib_ring_buffer_try_discard_reserve(const struct lib_ring_buffer_config *config,
const struct lib_ring_buffer_ctx *ctx)
{
struct lib_ring_buffer *buf = ctx->buf;
unsigned long end_offset = ctx->pre_offset + ctx->slot_size;
/*
* We need to ensure that if the cmpxchg succeeds and discards the
* record, the next record will record a full TSC, because it cannot
* rely on the last_tsc associated with the discarded record to detect
* overflows. The only way to ensure this is to set the last_tsc to 0
* (assuming no 64-bit TSC overflow), which forces to write a 64-bit
* timestamp in the next record.
*
* Note: if discard fails, we must leave the TSC in the record header.
* It is needed to keep track of TSC overflows for the following
* records.
*/
save_last_tsc(config, buf, 0ULL);
if (likely(v_cmpxchg(config, &buf->offset, end_offset, ctx->pre_offset)
!= end_offset))
return -EPERM;
else
return 0;
}
static inline
void channel_record_disable(const struct lib_ring_buffer_config *config,
struct channel *chan)
{
atomic_inc(&chan->record_disabled);
}
static inline
void channel_record_enable(const struct lib_ring_buffer_config *config,
struct channel *chan)
{
atomic_dec(&chan->record_disabled);
}
static inline
void lib_ring_buffer_record_disable(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
atomic_inc(&buf->record_disabled);
}
static inline
void lib_ring_buffer_record_enable(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
atomic_dec(&buf->record_disabled);
}
#endif /* _LINUX_RING_BUFFER_FRONTEND_API_H */
#ifndef _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H
#define _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H
/*
* linux/ringbuffer/frontend_internal.h
*
* (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring Buffer Library Synchronization Header (internal helpers).
*
* Author:
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* See ring_buffer_frontend.c for more information on wait-free algorithms.
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include "../../wrapper/ringbuffer/config.h"
#include "../../wrapper/ringbuffer/backend_types.h"
#include "../../wrapper/ringbuffer/frontend_types.h"
#include "../../lib/prio_heap/lttng_prio_heap.h" /* For per-CPU read-side iterator */
/* Buffer offset macros */
/* buf_trunc mask selects only the buffer number. */
static inline
unsigned long buf_trunc(unsigned long offset, struct channel *chan)
{
return offset & ~(chan->backend.buf_size - 1);
}
/* Select the buffer number value (counter). */
static inline
unsigned long buf_trunc_val(unsigned long offset, struct channel *chan)
{
return buf_trunc(offset, chan) >> chan->backend.buf_size_order;
}
/* buf_offset mask selects only the offset within the current buffer. */
static inline
unsigned long buf_offset(unsigned long offset, struct channel *chan)
{
return offset & (chan->backend.buf_size - 1);
}
/* subbuf_offset mask selects the offset within the current subbuffer. */
static inline
unsigned long subbuf_offset(unsigned long offset, struct channel *chan)
{
return offset & (chan->backend.subbuf_size - 1);
}
/* subbuf_trunc mask selects the subbuffer number. */
static inline
unsigned long subbuf_trunc(unsigned long offset, struct channel *chan)
{
return offset & ~(chan->backend.subbuf_size - 1);
}
/* subbuf_align aligns the offset to the next subbuffer. */
static inline
unsigned long subbuf_align(unsigned long offset, struct channel *chan)
{
return (offset + chan->backend.subbuf_size)
& ~(chan->backend.subbuf_size - 1);
}
/* subbuf_index returns the index of the current subbuffer within the buffer. */
static inline
unsigned long subbuf_index(unsigned long offset, struct channel *chan)
{
return buf_offset(offset, chan) >> chan->backend.subbuf_size_order;
}
/*
* Last TSC comparison functions. Check if the current TSC overflows tsc_bits
* bits from the last TSC read. When overflows are detected, the full 64-bit
* timestamp counter should be written in the record header. Reads and writes
* last_tsc atomically.
*/
#if (BITS_PER_LONG == 32)
static inline
void save_last_tsc(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf, u64 tsc)
{
if (config->tsc_bits == 0 || config->tsc_bits == 64)
return;
/*
* Ensure the compiler performs this update in a single instruction.
*/
v_set(config, &buf->last_tsc, (unsigned long)(tsc >> config->tsc_bits));
}
static inline
int last_tsc_overflow(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf, u64 tsc)
{
unsigned long tsc_shifted;
if (config->tsc_bits == 0 || config->tsc_bits == 64)
return 0;
tsc_shifted = (unsigned long)(tsc >> config->tsc_bits);
if (unlikely(tsc_shifted
- (unsigned long)v_read(config, &buf->last_tsc)))
return 1;
else
return 0;
}
#else
static inline
void save_last_tsc(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf, u64 tsc)
{
if (config->tsc_bits == 0 || config->tsc_bits == 64)
return;
v_set(config, &buf->last_tsc, (unsigned long)tsc);
}
static inline
int last_tsc_overflow(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf, u64 tsc)
{
if (config->tsc_bits == 0 || config->tsc_bits == 64)
return 0;
if (unlikely((tsc - v_read(config, &buf->last_tsc))
>> config->tsc_bits))
return 1;
else
return 0;
}
#endif
extern
int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx);
extern
void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf,
enum switch_mode mode);
/* Buffer write helpers */
static inline
void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf,
struct channel *chan,
unsigned long offset)
{
unsigned long consumed_old, consumed_new;
do {
consumed_old = atomic_long_read(&buf->consumed);
/*
* If buffer is in overwrite mode, push the reader consumed
* count if the write position has reached it and we are not
* at the first iteration (don't push the reader farther than
* the writer). This operation can be done concurrently by many
* writers in the same buffer, the writer being at the farthest
* write position sub-buffer index in the buffer being the one
* which will win this loop.
*/
if (unlikely(subbuf_trunc(offset, chan)
- subbuf_trunc(consumed_old, chan)
>= chan->backend.buf_size))
consumed_new = subbuf_align(consumed_old, chan);
else
return;
} while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old,
consumed_new) != consumed_old));
}
static inline
void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
unsigned long commit_count,
unsigned long idx)
{
if (config->oops == RING_BUFFER_OOPS_CONSISTENCY)
v_set(config, &buf->commit_hot[idx].seq, commit_count);
}
static inline
int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
struct channel *chan)
{
unsigned long consumed_old, consumed_idx, commit_count, write_offset;
consumed_old = atomic_long_read(&buf->consumed);
consumed_idx = subbuf_index(consumed_old, chan);
commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
/*
* No memory barrier here, since we are only interested
* in a statistically correct polling result. The next poll will
* get the data is we are racing. The mb() that ensures correct
* memory order is in get_subbuf.
*/
write_offset = v_read(config, &buf->offset);
/*
* Check that the subbuffer we are trying to consume has been
* already fully committed.
*/
if (((commit_count - chan->backend.subbuf_size)
& chan->commit_count_mask)
- (buf_trunc(consumed_old, chan)
>> chan->backend.num_subbuf_order)
!= 0)
return 0;
/*
* Check that we are not about to read the same subbuffer in
* which the writer head is.
*/
if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_old, chan)
== 0)
return 0;
return 1;
}
static inline
int lib_ring_buffer_pending_data(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
struct channel *chan)
{
return !!subbuf_offset(v_read(config, &buf->offset), chan);
}
static inline
unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
unsigned long idx)
{
return subbuffer_get_data_size(config, &buf->backend, idx);
}
/*
* Check if all space reservation in a buffer have been committed. This helps
* knowing if an execution context is nested (for per-cpu buffers only).
* This is a very specific ftrace use-case, so we keep this as "internal" API.
*/
static inline
int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
struct channel *chan)
{
unsigned long offset, idx, commit_count;
CHAN_WARN_ON(chan, config->alloc != RING_BUFFER_ALLOC_PER_CPU);
CHAN_WARN_ON(chan, config->sync != RING_BUFFER_SYNC_PER_CPU);
/*
* Read offset and commit count in a loop so they are both read
* atomically wrt interrupts. By deal with interrupt concurrency by
* restarting both reads if the offset has been pushed. Note that given
* we only have to deal with interrupt concurrency here, an interrupt
* modifying the commit count will also modify "offset", so it is safe
* to only check for offset modifications.
*/
do {
offset = v_read(config, &buf->offset);
idx = subbuf_index(offset, chan);
commit_count = v_read(config, &buf->commit_hot[idx].cc);
} while (offset != v_read(config, &buf->offset));
return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
- (commit_count & chan->commit_count_mask) == 0);
}
static inline
void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
struct channel *chan,
unsigned long offset,
unsigned long commit_count,
unsigned long idx)
{
unsigned long old_commit_count = commit_count
- chan->backend.subbuf_size;
u64 tsc;
/* Check if all commits have been done */
if (unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
- (old_commit_count & chan->commit_count_mask) == 0)) {
/*
* If we succeeded at updating cc_sb below, we are the subbuffer
* writer delivering the subbuffer. Deals with concurrent
* updates of the "cc" value without adding a add_return atomic
* operation to the fast path.
*
* We are doing the delivery in two steps:
* - First, we cmpxchg() cc_sb to the new value
* old_commit_count + 1. This ensures that we are the only
* subbuffer user successfully filling the subbuffer, but we
* do _not_ set the cc_sb value to "commit_count" yet.
* Therefore, other writers that would wrap around the ring
* buffer and try to start writing to our subbuffer would
* have to drop records, because it would appear as
* non-filled.
* We therefore have exclusive access to the subbuffer control
* structures. This mutual exclusion with other writers is
* crucially important to perform record overruns count in
* flight recorder mode locklessly.
* - When we are ready to release the subbuffer (either for
* reading or for overrun by other writers), we simply set the
* cc_sb value to "commit_count" and perform delivery.
*
* The subbuffer size is least 2 bytes (minimum size: 1 page).
* This guarantees that old_commit_count + 1 != commit_count.
*/
if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb,
old_commit_count, old_commit_count + 1)
== old_commit_count)) {
/*
* Start of exclusive subbuffer access. We are
* guaranteed to be the last writer in this subbuffer
* and any other writer trying to access this subbuffer
* in this state is required to drop records.
*/
tsc = config->cb.ring_buffer_clock_read(chan);
v_add(config,
subbuffer_get_records_count(config,
&buf->backend, idx),
&buf->records_count);
v_add(config,
subbuffer_count_records_overrun(config,
&buf->backend,
idx),
&buf->records_overrun);
config->cb.buffer_end(buf, tsc, idx,
lib_ring_buffer_get_data_size(config,
buf,
idx));
/*
* Set noref flag and offset for this subbuffer id.
* Contains a memory barrier that ensures counter stores
* are ordered before set noref and offset.
*/
lib_ring_buffer_set_noref_offset(config, &buf->backend, idx,
buf_trunc_val(offset, chan));
/*
* Order set_noref and record counter updates before the
* end of subbuffer exclusive access. Orders with
* respect to writers coming into the subbuffer after
* wrap around, and also order wrt concurrent readers.
*/
smp_mb();
/* End of exclusive subbuffer access */
v_set(config, &buf->commit_cold[idx].cc_sb,
commit_count);
lib_ring_buffer_vmcore_check_deliver(config, buf,
commit_count, idx);
/*
* RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
*/
if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
&& atomic_long_read(&buf->active_readers)
&& lib_ring_buffer_poll_deliver(config, buf, chan)) {
wake_up_interruptible(&buf->read_wait);
wake_up_interruptible(&chan->read_wait);
}
}
}
}
/*
* lib_ring_buffer_write_commit_counter
*
* For flight recording. must be called after commit.
* This function increments the subbuffer's commit_seq counter each time the
* commit count reaches back the reserve offset (modulo subbuffer size). It is
* useful for crash dump.
*/
static inline
void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
struct channel *chan,
unsigned long idx,
unsigned long buf_offset,
unsigned long commit_count,
size_t slot_size)
{
unsigned long offset, commit_seq_old;
if (config->oops != RING_BUFFER_OOPS_CONSISTENCY)
return;
offset = buf_offset + slot_size;
/*
* subbuf_offset includes commit_count_mask. We can simply
* compare the offsets within the subbuffer without caring about
* buffer full/empty mismatch because offset is never zero here
* (subbuffer header and record headers have non-zero length).
*/
if (unlikely(subbuf_offset(offset - commit_count, chan)))
return;
commit_seq_old = v_read(config, &buf->commit_hot[idx].seq);
while ((long) (commit_seq_old - commit_count) < 0)
commit_seq_old = v_cmpxchg(config, &buf->commit_hot[idx].seq,
commit_seq_old, commit_count);
}
extern int lib_ring_buffer_create(struct lib_ring_buffer *buf,
struct channel_backend *chanb, int cpu);
extern void lib_ring_buffer_free(struct lib_ring_buffer *buf);
/* Keep track of trap nesting inside ring buffer code */
DECLARE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
#endif /* _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H */
#ifndef _LINUX_RING_BUFFER_FRONTEND_TYPES_H
#define _LINUX_RING_BUFFER_FRONTEND_TYPES_H
/*
* linux/ringbuffer/frontend_types.h
*
* (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring Buffer Library Synchronization Header (types).
*
* Author:
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* See ring_buffer_frontend.c for more information on wait-free algorithms.
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include <linux/kref.h>
#include "../../wrapper/ringbuffer/config.h"
#include "../../wrapper/ringbuffer/backend_types.h"
#include "../../wrapper/spinlock.h"
#include "../../lib/prio_heap/lttng_prio_heap.h" /* For per-CPU read-side iterator */
/*
* A switch is done during tracing or as a final flush after tracing (so it
* won't write in the new sub-buffer).
*/
enum switch_mode { SWITCH_ACTIVE, SWITCH_FLUSH };
/* channel-level read-side iterator */
struct channel_iter {
/* Prio heap of buffers. Lowest timestamps at the top. */
struct lttng_ptr_heap heap; /* Heap of struct lib_ring_buffer ptrs */
struct list_head empty_head; /* Empty buffers linked-list head */
int read_open; /* Opened for reading ? */
u64 last_qs; /* Last quiescent state timestamp */
u64 last_timestamp; /* Last timestamp (for WARN_ON) */
int last_cpu; /* Last timestamp cpu */
/*
* read() file operation state.
*/
unsigned long len_left;
};
/* channel: collection of per-cpu ring buffers. */
struct channel {
atomic_t record_disabled;
unsigned long commit_count_mask; /*
* Commit count mask, removing
* the MSBs corresponding to
* bits used to represent the
* subbuffer index.
*/
struct channel_backend backend; /* Associated backend */
unsigned long switch_timer_interval; /* Buffer flush (jiffies) */
unsigned long read_timer_interval; /* Reader wakeup (jiffies) */
struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */
struct notifier_block tick_nohz_notifier; /* CPU nohz notifier */
struct notifier_block hp_iter_notifier; /* hotplug iterator notifier */
int cpu_hp_enable:1; /* Enable CPU hotplug notif. */
int hp_iter_enable:1; /* Enable hp iter notif. */
wait_queue_head_t read_wait; /* reader wait queue */
wait_queue_head_t hp_wait; /* CPU hotplug wait queue */
int finalized; /* Has channel been finalized */
struct channel_iter iter; /* Channel read-side iterator */
struct kref ref; /* Reference count */
};
/* Per-subbuffer commit counters used on the hot path */
struct commit_counters_hot {
union v_atomic cc; /* Commit counter */
union v_atomic seq; /* Consecutive commits */
};
/* Per-subbuffer commit counters used only on cold paths */
struct commit_counters_cold {
union v_atomic cc_sb; /* Incremented _once_ at sb switch */
};
/* Per-buffer read iterator */
struct lib_ring_buffer_iter {
u64 timestamp; /* Current record timestamp */
size_t header_len; /* Current record header length */
size_t payload_len; /* Current record payload length */
struct list_head empty_node; /* Linked list of empty buffers */
unsigned long consumed, read_offset, data_size;
enum {
ITER_GET_SUBBUF = 0,
ITER_TEST_RECORD,
ITER_NEXT_RECORD,
ITER_PUT_SUBBUF,
} state;
int allocated:1;
int read_open:1; /* Opened for reading ? */
};
/* ring buffer state */
struct lib_ring_buffer {
/* First 32 bytes cache-hot cacheline */
union v_atomic offset; /* Current offset in the buffer */
struct commit_counters_hot *commit_hot;
/* Commit count per sub-buffer */
atomic_long_t consumed; /*
* Current offset in the buffer
* standard atomic access (shared)
*/
atomic_t record_disabled;
/* End of first 32 bytes cacheline */
union v_atomic last_tsc; /*
* Last timestamp written in the buffer.
*/
struct lib_ring_buffer_backend backend; /* Associated backend */
struct commit_counters_cold *commit_cold;
/* Commit count per sub-buffer */
atomic_long_t active_readers; /*
* Active readers count
* standard atomic access (shared)
*/
/* Dropped records */
union v_atomic records_lost_full; /* Buffer full */
union v_atomic records_lost_wrap; /* Nested wrap-around */
union v_atomic records_lost_big; /* Events too big */
union v_atomic records_count; /* Number of records written */
union v_atomic records_overrun; /* Number of overwritten records */
wait_queue_head_t read_wait; /* reader buffer-level wait queue */
wait_queue_head_t write_wait; /* writer buffer-level wait queue (for metadata only) */
int finalized; /* buffer has been finalized */
struct timer_list switch_timer; /* timer for periodical switch */
struct timer_list read_timer; /* timer for read poll */
raw_spinlock_t raw_tick_nohz_spinlock; /* nohz entry lock/trylock */
struct lib_ring_buffer_iter iter; /* read-side iterator */
unsigned long get_subbuf_consumed; /* Read-side consumed */
unsigned long prod_snapshot; /* Producer count snapshot */
unsigned long cons_snapshot; /* Consumer count snapshot */
int get_subbuf:1; /* Sub-buffer being held by reader */
int switch_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */
int read_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */
};
static inline
void *channel_get_private(struct channel *chan)
{
return chan->backend.priv;
}
/*
* Issue warnings and disable channels upon internal error.
* Can receive struct lib_ring_buffer or struct lib_ring_buffer_backend
* parameters.
*/
#define CHAN_WARN_ON(c, cond) \
({ \
struct channel *__chan; \
int _____ret = unlikely(cond); \
if (_____ret) { \
if (__same_type(*(c), struct channel_backend)) \
__chan = container_of((void *) (c), \
struct channel, \
backend); \
else if (__same_type(*(c), struct channel)) \
__chan = (void *) (c); \
else \
BUG_ON(1); \
atomic_inc(&__chan->record_disabled); \
WARN_ON(1); \
} \
_____ret; \
})
#endif /* _LINUX_RING_BUFFER_FRONTEND_TYPES_H */
#ifndef _LINUX_RING_BUFFER_ITERATOR_H
#define _LINUX_RING_BUFFER_ITERATOR_H
/*
* linux/ringbuffer/iterator.h
*
* (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Ring buffer and channel iterators.
*
* Author:
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Dual LGPL v2.1/GPL v2 license.
*/
#include "../../wrapper/ringbuffer/backend.h"
#include "../../wrapper/ringbuffer/frontend.h"
/*
* lib_ring_buffer_get_next_record advances the buffer read position to the next
* record. It returns either the size of the next record, -EAGAIN if there is
* currently no data available, or -ENODATA if no data is available and buffer
* is finalized.
*/
extern ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
struct lib_ring_buffer *buf);
/*
* channel_get_next_record advances the buffer read position to the next record.
* It returns either the size of the next record, -EAGAIN if there is currently
* no data available, or -ENODATA if no data is available and buffer is
* finalized.
* Returns the current buffer in ret_buf.
*/
extern ssize_t channel_get_next_record(struct channel *chan,
struct lib_ring_buffer **ret_buf);
/**
* read_current_record - copy the buffer current record into dest.
* @buf: ring buffer
* @dest: destination where the record should be copied
*
* dest should be large enough to contain the record. Returns the number of
* bytes copied.
*/
static inline size_t read_current_record(struct lib_ring_buffer *buf, void *dest)
{
return lib_ring_buffer_read(&buf->backend, buf->iter.read_offset,
dest, buf->iter.payload_len);
}
extern int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf);
extern void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf);
extern int channel_iterator_open(struct channel *chan);
extern void channel_iterator_release(struct channel *chan);
extern const struct file_operations channel_payload_file_operations;
extern const struct file_operations lib_ring_buffer_payload_file_operations;
/*
* Used internally.
*/
int channel_iterator_init(struct channel *chan);
void channel_iterator_unregister_notifiers(struct channel *chan);
void channel_iterator_free(struct channel *chan);
void channel_iterator_reset(struct channel *chan);
void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf);
#endif /* _LINUX_RING_BUFFER_ITERATOR_H */
#ifndef _LINUX_RING_BUFFER_NOHZ_H
#define _LINUX_RING_BUFFER_NOHZ_H
/*
* ringbuffer/nohz.h
*
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Dual LGPL v2.1/GPL v2 license.
*/
#ifdef CONFIG_LIB_RING_BUFFER
void lib_ring_buffer_tick_nohz_flush(void);
void lib_ring_buffer_tick_nohz_stop(void);
void lib_ring_buffer_tick_nohz_restart(void);
#else
static inline void lib_ring_buffer_tick_nohz_flush(void)
{
}
static inline void lib_ring_buffer_tick_nohz_stop(void)
{
}
static inline void lib_ring_buffer_tick_nohz_restart(void)
{
}
#endif
#endif /* _LINUX_RING_BUFFER_NOHZ_H */
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册