messenger.h 8.1 KB
Newer Older
S
Sage Weil 已提交
1 2 3
#ifndef __FS_CEPH_MESSENGER_H
#define __FS_CEPH_MESSENGER_H

S
Sage Weil 已提交
4
#include <linux/kref.h>
S
Sage Weil 已提交
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
#include <linux/mutex.h>
#include <linux/net.h>
#include <linux/radix-tree.h>
#include <linux/uio.h>
#include <linux/workqueue.h>

#include "types.h"
#include "buffer.h"

struct ceph_msg;
struct ceph_connection;

/*
 * Ceph defines these callbacks for handling connection events.
 */
struct ceph_connection_operations {
	struct ceph_connection *(*get)(struct ceph_connection *);
	void (*put)(struct ceph_connection *);

	/* handle an incoming message. */
	void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);

27 28 29 30 31
	/* authorize an outgoing connection */
	int (*get_authorizer) (struct ceph_connection *con,
			       void **buf, int *len, int *proto,
			       void **reply_buf, int *reply_len, int force_new);
	int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
32
	int (*invalidate_authorizer)(struct ceph_connection *con);
33

S
Sage Weil 已提交
34 35 36 37 38 39 40 41 42 43 44
	/* protocol version mismatch */
	void (*bad_proto) (struct ceph_connection *con);

	/* there was some error on the socket (disconnect, whatever) */
	void (*fault) (struct ceph_connection *con);

	/* a remote host as terminated a message exchange session, and messages
	 * we sent (or they tried to send us) may be lost. */
	void (*peer_reset) (struct ceph_connection *con);

	struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
45 46
					struct ceph_msg_header *hdr,
					int *skip);
S
Sage Weil 已提交
47 48 49
};

/* use format string %s%d */
S
Sage Weil 已提交
50
#define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num)
S
Sage Weil 已提交
51 52 53

struct ceph_messenger {
	struct ceph_entity_inst inst;    /* my name+address */
54
	struct ceph_entity_addr my_enc_addr;
S
Sage Weil 已提交
55 56 57 58 59 60 61 62 63

	bool nocrc;

	/*
	 * the global_seq counts connections i (attempt to) initiate
	 * in order to disambiguate certain connect race conditions.
	 */
	u32 global_seq;
	spinlock_t global_seq_lock;
64 65 66

	u32 supported_features;
	u32 required_features;
S
Sage Weil 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80
};

/*
 * a single message.  it contains a header (src, dest, message type, etc.),
 * footer (crc values, mainly), a "front" message body, and possibly a
 * data payload (stored in some number of pages).
 */
struct ceph_msg {
	struct ceph_msg_header hdr;	/* header */
	struct ceph_msg_footer footer;	/* footer */
	struct kvec front;              /* unaligned blobs of message */
	struct ceph_buffer *middle;
	struct page **pages;            /* data payload.  NOT OWNER. */
	unsigned nr_pages;              /* size of page array */
81
	unsigned page_alignment;        /* io offset in first page */
82
	struct ceph_pagelist *pagelist; /* instead of pages */
S
Sage Weil 已提交
83
	struct list_head list_head;
S
Sage Weil 已提交
84
	struct kref kref;
85 86 87 88
	struct bio  *bio;		/* instead of pages/pagelist */
	struct bio  *bio_iter;		/* bio iterator */
	int bio_seg;			/* current bio segment */
	struct ceph_pagelist *trail;	/* the trailing part of the data */
S
Sage Weil 已提交
89 90
	bool front_is_vmalloc;
	bool more_to_follow;
91
	bool needs_out_seq;
S
Sage Weil 已提交
92
	int front_max;
93
	unsigned long ack_stamp;        /* tx: when we were acked */
S
Sage Weil 已提交
94 95 96 97 98 99 100

	struct ceph_msgpool *pool;
};

struct ceph_msg_pos {
	int page, page_pos;  /* which page; offset in page */
	int data_pos;        /* offset in data payload */
101
	bool did_page_crc;   /* true if we've calculated crc for current page */
S
Sage Weil 已提交
102 103 104 105 106 107 108 109 110 111
};

/* ceph connection fault delay defaults, for exponential backoff */
#define BASE_DELAY_INTERVAL	(HZ/2)
#define MAX_DELAY_INTERVAL	(5 * 60 * HZ)

/*
 * ceph_connection state bit flags
 */
#define LOSSYTX         0  /* we can close channel or drop messages on errors */
112 113
#define CONNECTING	1
#define NEGOTIATING	2
S
Sage Weil 已提交
114 115 116 117 118 119 120 121 122
#define KEEPALIVE_PENDING      3
#define WRITE_PENDING	4  /* we have data ready to send */
#define STANDBY		8  /* no outgoing messages, socket closed.  we keep
			    * the ceph_connection around to maintain shared
			    * state with the peer. */
#define CLOSED		10 /* we've closed the connection */
#define SOCK_CLOSED	11 /* socket state changed to closed */
#define OPENING         13 /* open connection w/ (possibly new) peer */
#define DEAD            14 /* dead, about to kfree */
S
Sage Weil 已提交
123
#define BACKOFF         15
S
Sage Weil 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145

/*
 * A single connection with another host.
 *
 * We maintain a queue of outgoing messages, and some session state to
 * ensure that we can preserve the lossless, ordered delivery of
 * messages in the case of a TCP disconnect.
 */
struct ceph_connection {
	void *private;
	atomic_t nref;

	const struct ceph_connection_operations *ops;

	struct ceph_messenger *msgr;
	struct socket *sock;
	unsigned long state;	/* connection state (see flags above) */
	const char *error_msg;  /* error message, if any */

	struct ceph_entity_addr peer_addr; /* peer address */
	struct ceph_entity_name peer_name; /* peer name */
	struct ceph_entity_addr peer_addr_for_me;
146
	unsigned peer_features;
S
Sage Weil 已提交
147 148 149 150
	u32 connect_seq;      /* identify the most recent connection
				 attempt for this connection, client */
	u32 peer_global_seq;  /* peer's global seq for this connection */

151 152 153 154
	int auth_retry;       /* true if we need a newer authorizer */
	void *auth_reply_buf;   /* where to put the authorizer reply */
	int auth_reply_buf_len;

155 156
	struct mutex mutex;

S
Sage Weil 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
	/* out queue */
	struct list_head out_queue;
	struct list_head out_sent;   /* sending or sent but unacked */
	u64 out_seq;		     /* last message queued for send */

	u64 in_seq, in_seq_acked;  /* last message received, acked */

	/* connection negotiation temps */
	char in_banner[CEPH_BANNER_MAX_LEN];
	union {
		struct {  /* outgoing connection */
			struct ceph_msg_connect out_connect;
			struct ceph_msg_connect_reply in_reply;
		};
		struct {  /* incoming */
			struct ceph_msg_connect in_connect;
			struct ceph_msg_connect_reply out_reply;
		};
	};
	struct ceph_entity_addr actual_peer_addr;

	/* message out temps */
	struct ceph_msg *out_msg;        /* sending message (== tail of
					    out_sent) */
181
	bool out_msg_done;
S
Sage Weil 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
	struct ceph_msg_pos out_msg_pos;

	struct kvec out_kvec[8],         /* sending header/footer data */
		*out_kvec_cur;
	int out_kvec_left;   /* kvec's left in out_kvec */
	int out_skip;        /* skip this many bytes */
	int out_kvec_bytes;  /* total bytes left */
	bool out_kvec_is_msg; /* kvec refers to out_msg */
	int out_more;        /* there is more data after the kvecs */
	__le64 out_temp_ack; /* for writing an ack */

	/* message in temps */
	struct ceph_msg_header in_hdr;
	struct ceph_msg *in_msg;
	struct ceph_msg_pos in_msg_pos;
	u32 in_front_crc, in_middle_crc, in_data_crc;  /* calculated crc */

	char in_tag;         /* protocol control byte */
	int in_base_pos;     /* bytes read */
	__le64 in_temp_ack;  /* for reading an ack */

	struct delayed_work work;	    /* send|recv work */
	unsigned long       delay;          /* current delay interval */
};


208
extern const char *ceph_pr_addr(const struct sockaddr_storage *ss);
S
Sage Weil 已提交
209 210 211 212 213 214 215
extern int ceph_parse_ips(const char *c, const char *end,
			  struct ceph_entity_addr *addr,
			  int max_count, int *count);


extern int ceph_msgr_init(void);
extern void ceph_msgr_exit(void);
216
extern void ceph_msgr_flush(void);
S
Sage Weil 已提交
217 218

extern struct ceph_messenger *ceph_messenger_create(
219 220
	struct ceph_entity_addr *myaddr,
	u32 features, u32 required);
S
Sage Weil 已提交
221 222 223 224 225 226
extern void ceph_messenger_destroy(struct ceph_messenger *);

extern void ceph_con_init(struct ceph_messenger *msgr,
			  struct ceph_connection *con);
extern void ceph_con_open(struct ceph_connection *con,
			  struct ceph_entity_addr *addr);
227
extern bool ceph_con_opened(struct ceph_connection *con);
S
Sage Weil 已提交
228 229 230
extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
231 232
extern void ceph_con_revoke_message(struct ceph_connection *con,
				  struct ceph_msg *msg);
S
Sage Weil 已提交
233 234 235 236
extern void ceph_con_keepalive(struct ceph_connection *con);
extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con);

237 238
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
				     bool can_fail);
S
Sage Weil 已提交
239 240 241 242 243
extern void ceph_msg_kfree(struct ceph_msg *m);


static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{
S
Sage Weil 已提交
244
	kref_get(&msg->kref);
S
Sage Weil 已提交
245 246
	return msg;
}
S
Sage Weil 已提交
247 248 249 250 251
extern void ceph_msg_last_put(struct kref *kref);
static inline void ceph_msg_put(struct ceph_msg *msg)
{
	kref_put(&msg->kref, ceph_msg_last_put);
}
S
Sage Weil 已提交
252

253 254
extern void ceph_msg_dump(struct ceph_msg *msg);

S
Sage Weil 已提交
255
#endif