messenger.h 9.1 KB
Newer Older
S
Sage Weil 已提交
1 2 3
#ifndef __FS_CEPH_MESSENGER_H
#define __FS_CEPH_MESSENGER_H

S
Sage Weil 已提交
4
#include <linux/kref.h>
S
Sage Weil 已提交
5 6 7 8 9 10
#include <linux/mutex.h>
#include <linux/net.h>
#include <linux/radix-tree.h>
#include <linux/uio.h>
#include <linux/workqueue.h>

11 12
#include <linux/ceph/types.h>
#include <linux/ceph/buffer.h>
S
Sage Weil 已提交
13 14 15 16 17 18 19 20 21 22 23 24 25 26

struct ceph_msg;
struct ceph_connection;

/*
 * Ceph defines these callbacks for handling connection events.
 */
struct ceph_connection_operations {
	struct ceph_connection *(*get)(struct ceph_connection *);
	void (*put)(struct ceph_connection *);

	/* handle an incoming message. */
	void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);

27
	/* authorize an outgoing connection */
28 29
	struct ceph_auth_handshake *(*get_authorizer) (
				struct ceph_connection *con,
30
			       int *proto, int force_new);
31
	int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
32
	int (*invalidate_authorizer)(struct ceph_connection *con);
33

S
Sage Weil 已提交
34 35 36 37 38 39 40 41
	/* there was some error on the socket (disconnect, whatever) */
	void (*fault) (struct ceph_connection *con);

	/* a remote host as terminated a message exchange session, and messages
	 * we sent (or they tried to send us) may be lost. */
	void (*peer_reset) (struct ceph_connection *con);

	struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
42 43
					struct ceph_msg_header *hdr,
					int *skip);
S
Sage Weil 已提交
44 45 46
};

/* use format string %s%d */
S
Sage Weil 已提交
47
#define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num)
S
Sage Weil 已提交
48 49 50

struct ceph_messenger {
	struct ceph_entity_inst inst;    /* my name+address */
51
	struct ceph_entity_addr my_enc_addr;
S
Sage Weil 已提交
52

53
	atomic_t stopping;
S
Sage Weil 已提交
54 55 56 57 58 59 60 61
	bool nocrc;

	/*
	 * the global_seq counts connections i (attempt to) initiate
	 * in order to disambiguate certain connect race conditions.
	 */
	u32 global_seq;
	spinlock_t global_seq_lock;
62 63 64

	u32 supported_features;
	u32 required_features;
S
Sage Weil 已提交
65 66
};

A
Alex Elder 已提交
67 68
#define ceph_msg_has_pages(m)		((m)->p.type == CEPH_MSG_DATA_PAGES)
#define ceph_msg_has_pagelist(m)	((m)->l.type == CEPH_MSG_DATA_PAGELIST)
69
#ifdef CONFIG_BLOCK
A
Alex Elder 已提交
70
#define ceph_msg_has_bio(m)		((m)->b.type == CEPH_MSG_DATA_BIO)
71
#endif /* CONFIG_BLOCK */
A
Alex Elder 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96

enum ceph_msg_data_type {
	CEPH_MSG_DATA_NONE,	/* message contains no data payload */
	CEPH_MSG_DATA_PAGES,	/* data source/destination is a page array */
	CEPH_MSG_DATA_PAGELIST,	/* data source/destination is a pagelist */
#ifdef CONFIG_BLOCK
	CEPH_MSG_DATA_BIO,	/* data source/destination is a bio list */
#endif /* CONFIG_BLOCK */
};

static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
{
	switch (type) {
	case CEPH_MSG_DATA_NONE:
	case CEPH_MSG_DATA_PAGES:
	case CEPH_MSG_DATA_PAGELIST:
#ifdef CONFIG_BLOCK
	case CEPH_MSG_DATA_BIO:
#endif /* CONFIG_BLOCK */
		return true;
	default:
		return false;
	}
}

97
struct ceph_msg_data_cursor {
98
	size_t		resid;		/* bytes not yet consumed */
99
	bool		last_piece;	/* now at last piece of data item */
100
	union {
101 102 103 104 105 106 107
#ifdef CONFIG_BLOCK
		struct {				/* bio */
			struct bio	*bio;		/* bio from list */
			unsigned int	vector_index;	/* vector from bio */
			unsigned int	vector_offset;	/* bytes from vector */
		};
#endif /* CONFIG_BLOCK */
108 109 110 111 112
		struct {				/* pages */
			unsigned int	page_offset;	/* offset in page */
			unsigned short	page_index;	/* index in array */
			unsigned short	page_count;	/* pages in array */
		};
113 114 115 116 117
		struct {				/* pagelist */
			struct page	*page;		/* page from list */
			size_t		offset;		/* bytes from list */
		};
	};
118 119
};

A
Alex Elder 已提交
120 121 122 123
struct ceph_msg_data {
	enum ceph_msg_data_type		type;
	union {
#ifdef CONFIG_BLOCK
124
		struct bio		*bio;
A
Alex Elder 已提交
125 126 127 128 129 130 131 132
#endif /* CONFIG_BLOCK */
		struct {
			struct page	**pages;	/* NOT OWNER. */
			size_t		length;		/* total # bytes */
			unsigned int	alignment;	/* first page */
		};
		struct ceph_pagelist	*pagelist;
	};
133
	struct ceph_msg_data_cursor	cursor;		/* pagelist only */
A
Alex Elder 已提交
134
};
135

S
Sage Weil 已提交
136 137 138 139 140 141 142 143 144 145
/*
 * a single message.  it contains a header (src, dest, message type, etc.),
 * footer (crc values, mainly), a "front" message body, and possibly a
 * data payload (stored in some number of pages).
 */
struct ceph_msg {
	struct ceph_msg_header hdr;	/* header */
	struct ceph_msg_footer footer;	/* footer */
	struct kvec front;              /* unaligned blobs of message */
	struct ceph_buffer *middle;
146

147
	/* data payload */
A
Alex Elder 已提交
148 149
	struct ceph_msg_data	p;	/* pages */
	struct ceph_msg_data	l;	/* pagelist */
150
#ifdef CONFIG_BLOCK
A
Alex Elder 已提交
151
	struct ceph_msg_data	b;	/* bio */
152
#endif /* CONFIG_BLOCK */
153 154 155 156 157

	struct ceph_connection *con;
	struct list_head list_head;	/* links for connection lists */

	struct kref kref;
S
Sage Weil 已提交
158 159
	bool front_is_vmalloc;
	bool more_to_follow;
160
	bool needs_out_seq;
S
Sage Weil 已提交
161
	int front_max;
162
	unsigned long ack_stamp;        /* tx: when we were acked */
S
Sage Weil 已提交
163 164 165 166 167 168 169

	struct ceph_msgpool *pool;
};

struct ceph_msg_pos {
	int page, page_pos;  /* which page; offset in page */
	int data_pos;        /* offset in data payload */
170
	bool did_page_crc;   /* true if we've calculated crc for current page */
S
Sage Weil 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
};

/* ceph connection fault delay defaults, for exponential backoff */
#define BASE_DELAY_INTERVAL	(HZ/2)
#define MAX_DELAY_INTERVAL	(5 * 60 * HZ)

/*
 * A single connection with another host.
 *
 * We maintain a queue of outgoing messages, and some session state to
 * ensure that we can preserve the lossless, ordered delivery of
 * messages in the case of a TCP disconnect.
 */
struct ceph_connection {
	void *private;

	const struct ceph_connection_operations *ops;

	struct ceph_messenger *msgr;
190 191

	atomic_t sock_state;
S
Sage Weil 已提交
192
	struct socket *sock;
193 194 195
	struct ceph_entity_addr peer_addr; /* peer address */
	struct ceph_entity_addr peer_addr_for_me;

196 197
	unsigned long flags;
	unsigned long state;
S
Sage Weil 已提交
198 199 200
	const char *error_msg;  /* error message, if any */

	struct ceph_entity_name peer_name; /* peer name */
201

202
	unsigned peer_features;
S
Sage Weil 已提交
203 204 205 206
	u32 connect_seq;      /* identify the most recent connection
				 attempt for this connection, client */
	u32 peer_global_seq;  /* peer's global seq for this connection */

207 208 209 210
	int auth_retry;       /* true if we need a newer authorizer */
	void *auth_reply_buf;   /* where to put the authorizer reply */
	int auth_reply_buf_len;

211 212
	struct mutex mutex;

S
Sage Weil 已提交
213 214 215 216 217 218 219 220 221
	/* out queue */
	struct list_head out_queue;
	struct list_head out_sent;   /* sending or sent but unacked */
	u64 out_seq;		     /* last message queued for send */

	u64 in_seq, in_seq_acked;  /* last message received, acked */

	/* connection negotiation temps */
	char in_banner[CEPH_BANNER_MAX_LEN];
S
Sage Weil 已提交
222 223
	struct ceph_msg_connect out_connect;
	struct ceph_msg_connect_reply in_reply;
S
Sage Weil 已提交
224 225 226 227 228
	struct ceph_entity_addr actual_peer_addr;

	/* message out temps */
	struct ceph_msg *out_msg;        /* sending message (== tail of
					    out_sent) */
229
	bool out_msg_done;
S
Sage Weil 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
	struct ceph_msg_pos out_msg_pos;

	struct kvec out_kvec[8],         /* sending header/footer data */
		*out_kvec_cur;
	int out_kvec_left;   /* kvec's left in out_kvec */
	int out_skip;        /* skip this many bytes */
	int out_kvec_bytes;  /* total bytes left */
	bool out_kvec_is_msg; /* kvec refers to out_msg */
	int out_more;        /* there is more data after the kvecs */
	__le64 out_temp_ack; /* for writing an ack */

	/* message in temps */
	struct ceph_msg_header in_hdr;
	struct ceph_msg *in_msg;
	struct ceph_msg_pos in_msg_pos;
	u32 in_front_crc, in_middle_crc, in_data_crc;  /* calculated crc */

	char in_tag;         /* protocol control byte */
	int in_base_pos;     /* bytes read */
	__le64 in_temp_ack;  /* for reading an ack */

	struct delayed_work work;	    /* send|recv work */
	unsigned long       delay;          /* current delay interval */
};


256
extern const char *ceph_pr_addr(const struct sockaddr_storage *ss);
S
Sage Weil 已提交
257 258 259 260 261 262 263
extern int ceph_parse_ips(const char *c, const char *end,
			  struct ceph_entity_addr *addr,
			  int max_count, int *count);


extern int ceph_msgr_init(void);
extern void ceph_msgr_exit(void);
264
extern void ceph_msgr_flush(void);
S
Sage Weil 已提交
265

266 267 268 269 270
extern void ceph_messenger_init(struct ceph_messenger *msgr,
			struct ceph_entity_addr *myaddr,
			u32 supported_features,
			u32 required_features,
			bool nocrc);
S
Sage Weil 已提交
271

272 273
extern void ceph_con_init(struct ceph_connection *con, void *private,
			const struct ceph_connection_operations *ops,
274
			struct ceph_messenger *msgr);
S
Sage Weil 已提交
275
extern void ceph_con_open(struct ceph_connection *con,
276
			  __u8 entity_type, __u64 entity_num,
S
Sage Weil 已提交
277
			  struct ceph_entity_addr *addr);
278
extern bool ceph_con_opened(struct ceph_connection *con);
S
Sage Weil 已提交
279 280
extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
281 282

extern void ceph_msg_revoke(struct ceph_msg *msg);
283 284
extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);

S
Sage Weil 已提交
285 286
extern void ceph_con_keepalive(struct ceph_connection *con);

287
extern void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
288
				size_t length, size_t alignment);
289 290 291
extern void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
				struct ceph_pagelist *pagelist);
extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio);
292

293 294
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
				     bool can_fail);
S
Sage Weil 已提交
295 296 297 298 299
extern void ceph_msg_kfree(struct ceph_msg *m);


static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{
S
Sage Weil 已提交
300
	kref_get(&msg->kref);
S
Sage Weil 已提交
301 302
	return msg;
}
S
Sage Weil 已提交
303 304 305 306 307
extern void ceph_msg_last_put(struct kref *kref);
static inline void ceph_msg_put(struct ceph_msg *msg)
{
	kref_put(&msg->kref, ceph_msg_last_put);
}
S
Sage Weil 已提交
308

309 310
extern void ceph_msg_dump(struct ceph_msg *msg);

S
Sage Weil 已提交
311
#endif