diff --git a/include/linux/rhashtable.h b/include/linux/rhashtable.h index de1459c74c4d79d4a4d8a5a294c0aaddf4e7e46f..326acd8c2e9f91da4f866c00d2450272339467a8 100644 --- a/include/linux/rhashtable.h +++ b/include/linux/rhashtable.h @@ -113,7 +113,7 @@ struct rhashtable { struct bucket_table __rcu *tbl; struct bucket_table __rcu *future_tbl; atomic_t nelems; - size_t shift; + atomic_t shift; struct rhashtable_params p; struct delayed_work run_work; struct mutex mutex; @@ -168,6 +168,7 @@ int rhashtable_shrink(struct rhashtable *ht); void *rhashtable_lookup(struct rhashtable *ht, const void *key); void *rhashtable_lookup_compare(struct rhashtable *ht, const void *key, bool (*compare)(void *, void *), void *arg); +bool rhashtable_lookup_insert(struct rhashtable *ht, struct rhash_head *obj); void rhashtable_destroy(struct rhashtable *ht); diff --git a/lib/rhashtable.c b/lib/rhashtable.c index cbad192d3b3d4088a448fcedbfb3a8861bc84239..8023b554905c0e76b35c7361038e6a6349acb87b 100644 --- a/lib/rhashtable.c +++ b/lib/rhashtable.c @@ -199,7 +199,8 @@ static struct bucket_table *bucket_table_alloc(struct rhashtable *ht, bool rht_grow_above_75(const struct rhashtable *ht, size_t new_size) { /* Expand table when exceeding 75% load */ - return atomic_read(&ht->nelems) > (new_size / 4 * 3); + return atomic_read(&ht->nelems) > (new_size / 4 * 3) && + (ht->p.max_shift && atomic_read(&ht->shift) < ht->p.max_shift); } EXPORT_SYMBOL_GPL(rht_grow_above_75); @@ -211,7 +212,8 @@ EXPORT_SYMBOL_GPL(rht_grow_above_75); bool rht_shrink_below_30(const struct rhashtable *ht, size_t new_size) { /* Shrink table beneath 30% load */ - return atomic_read(&ht->nelems) < (new_size * 3 / 10); + return atomic_read(&ht->nelems) < (new_size * 3 / 10) && + (atomic_read(&ht->shift) > ht->p.min_shift); } EXPORT_SYMBOL_GPL(rht_shrink_below_30); @@ -318,14 +320,11 @@ int rhashtable_expand(struct rhashtable *ht) ASSERT_RHT_MUTEX(ht); - if (ht->p.max_shift && ht->shift >= ht->p.max_shift) - return 0; - new_tbl = bucket_table_alloc(ht, old_tbl->size * 2); if (new_tbl == NULL) return -ENOMEM; - ht->shift++; + atomic_inc(&ht->shift); /* Make insertions go into the new, empty table right away. Deletions * and lookups will be attempted in both tables until we synchronize. @@ -421,9 +420,6 @@ int rhashtable_shrink(struct rhashtable *ht) ASSERT_RHT_MUTEX(ht); - if (ht->shift <= ht->p.min_shift) - return 0; - new_tbl = bucket_table_alloc(ht, tbl->size / 2); if (new_tbl == NULL) return -ENOMEM; @@ -462,7 +458,7 @@ int rhashtable_shrink(struct rhashtable *ht) /* Publish the new, valid hash table */ rcu_assign_pointer(ht->tbl, new_tbl); - ht->shift--; + atomic_dec(&ht->shift); /* Wait for readers. No new readers will have references to the * old hash table. @@ -492,8 +488,39 @@ static void rht_deferred_worker(struct work_struct *work) mutex_unlock(&ht->mutex); } +static void rhashtable_wakeup_worker(struct rhashtable *ht) +{ + struct bucket_table *tbl = rht_dereference_rcu(ht->tbl, ht); + struct bucket_table *new_tbl = rht_dereference_rcu(ht->future_tbl, ht); + size_t size = tbl->size; + + /* Only adjust the table if no resizing is currently in progress. */ + if (tbl == new_tbl && + ((ht->p.grow_decision && ht->p.grow_decision(ht, size)) || + (ht->p.shrink_decision && ht->p.shrink_decision(ht, size)))) + schedule_delayed_work(&ht->run_work, 0); +} + +static void __rhashtable_insert(struct rhashtable *ht, struct rhash_head *obj, + struct bucket_table *tbl, u32 hash) +{ + struct rhash_head *head = rht_dereference_bucket(tbl->buckets[hash], + tbl, hash); + + if (rht_is_a_nulls(head)) + INIT_RHT_NULLS_HEAD(obj->next, ht, hash); + else + RCU_INIT_POINTER(obj->next, head); + + rcu_assign_pointer(tbl->buckets[hash], obj); + + atomic_inc(&ht->nelems); + + rhashtable_wakeup_worker(ht); +} + /** - * rhashtable_insert - insert object into hash hash table + * rhashtable_insert - insert object into hash table * @ht: hash table * @obj: pointer to hash head inside object * @@ -510,7 +537,6 @@ static void rht_deferred_worker(struct work_struct *work) void rhashtable_insert(struct rhashtable *ht, struct rhash_head *obj) { struct bucket_table *tbl; - struct rhash_head *head; spinlock_t *lock; unsigned hash; @@ -521,22 +547,9 @@ void rhashtable_insert(struct rhashtable *ht, struct rhash_head *obj) lock = bucket_lock(tbl, hash); spin_lock_bh(lock); - head = rht_dereference_bucket(tbl->buckets[hash], tbl, hash); - if (rht_is_a_nulls(head)) - INIT_RHT_NULLS_HEAD(obj->next, ht, hash); - else - RCU_INIT_POINTER(obj->next, head); - - rcu_assign_pointer(tbl->buckets[hash], obj); + __rhashtable_insert(ht, obj, tbl, hash); spin_unlock_bh(lock); - atomic_inc(&ht->nelems); - - /* Only grow the table if no resizing is currently in progress. */ - if (ht->tbl != ht->future_tbl && - ht->p.grow_decision && ht->p.grow_decision(ht, tbl->size)) - schedule_delayed_work(&ht->run_work, 0); - rcu_read_unlock(); } EXPORT_SYMBOL_GPL(rhashtable_insert); @@ -550,7 +563,7 @@ EXPORT_SYMBOL_GPL(rhashtable_insert); * walk the bucket chain upon removal. The removal operation is thus * considerable slow if the hash table is not correctly sized. * - * Will automatically shrink the table via rhashtable_expand() if the the + * Will automatically shrink the table via rhashtable_expand() if the * shrink_decision function specified at rhashtable_init() returns true. * * The caller must ensure that no concurrent table mutations occur. It is @@ -584,20 +597,17 @@ bool rhashtable_remove(struct rhashtable *ht, struct rhash_head *obj) spin_unlock_bh(lock); - if (ht->tbl != ht->future_tbl && - ht->p.shrink_decision && - ht->p.shrink_decision(ht, tbl->size)) - schedule_delayed_work(&ht->run_work, 0); + rhashtable_wakeup_worker(ht); rcu_read_unlock(); return true; } - if (tbl != rht_dereference_rcu(ht->tbl, ht)) { + if (tbl != rht_dereference_rcu(ht->future_tbl, ht)) { spin_unlock_bh(lock); - tbl = rht_dereference_rcu(ht->tbl, ht); + tbl = rht_dereference_rcu(ht->future_tbl, ht); hash = head_hashfn(ht, tbl, obj); lock = bucket_lock(tbl, hash); @@ -612,6 +622,19 @@ bool rhashtable_remove(struct rhashtable *ht, struct rhash_head *obj) } EXPORT_SYMBOL_GPL(rhashtable_remove); +struct rhashtable_compare_arg { + struct rhashtable *ht; + const void *key; +}; + +static bool rhashtable_compare(void *ptr, void *arg) +{ + struct rhashtable_compare_arg *x = arg; + struct rhashtable *ht = x->ht; + + return !memcmp(ptr + ht->p.key_offset, x->key, ht->p.key_len); +} + /** * rhashtable_lookup - lookup key in hash table * @ht: hash table @@ -621,38 +644,20 @@ EXPORT_SYMBOL_GPL(rhashtable_remove); * for a entry with an identical key. The first matching entry is returned. * * This lookup function may only be used for fixed key hash table (key_len - * paramter set). It will BUG() if used inappropriately. + * parameter set). It will BUG() if used inappropriately. * * Lookups may occur in parallel with hashtable mutations and resizing. */ void *rhashtable_lookup(struct rhashtable *ht, const void *key) { - const struct bucket_table *tbl, *old_tbl; - struct rhash_head *he; - u32 hash; + struct rhashtable_compare_arg arg = { + .ht = ht, + .key = key, + }; BUG_ON(!ht->p.key_len); - rcu_read_lock(); - old_tbl = rht_dereference_rcu(ht->tbl, ht); - tbl = rht_dereference_rcu(ht->future_tbl, ht); - hash = key_hashfn(ht, key, ht->p.key_len); -restart: - rht_for_each_rcu(he, tbl, rht_bucket_index(tbl, hash)) { - if (memcmp(rht_obj(ht, he) + ht->p.key_offset, key, - ht->p.key_len)) - continue; - rcu_read_unlock(); - return rht_obj(ht, he); - } - - if (unlikely(tbl != old_tbl)) { - tbl = old_tbl; - goto restart; - } - - rcu_read_unlock(); - return NULL; + return rhashtable_lookup_compare(ht, key, &rhashtable_compare, &arg); } EXPORT_SYMBOL_GPL(rhashtable_lookup); @@ -700,6 +705,66 @@ void *rhashtable_lookup_compare(struct rhashtable *ht, const void *key, } EXPORT_SYMBOL_GPL(rhashtable_lookup_compare); +/** + * rhashtable_lookup_insert - lookup and insert object into hash table + * @ht: hash table + * @obj: pointer to hash head inside object + * + * Locks down the bucket chain in both the old and new table if a resize + * is in progress to ensure that writers can't remove from the old table + * and can't insert to the new table during the atomic operation of search + * and insertion. Searches for duplicates in both the old and new table if + * a resize is in progress. + * + * This lookup function may only be used for fixed key hash table (key_len + * parameter set). It will BUG() if used inappropriately. + * + * It is safe to call this function from atomic context. + * + * Will trigger an automatic deferred table resizing if the size grows + * beyond the watermark indicated by grow_decision() which can be passed + * to rhashtable_init(). + */ +bool rhashtable_lookup_insert(struct rhashtable *ht, struct rhash_head *obj) +{ + struct bucket_table *new_tbl, *old_tbl; + spinlock_t *new_bucket_lock, *old_bucket_lock; + u32 new_hash, old_hash; + bool success = true; + + BUG_ON(!ht->p.key_len); + + rcu_read_lock(); + + old_tbl = rht_dereference_rcu(ht->tbl, ht); + old_hash = head_hashfn(ht, old_tbl, obj); + old_bucket_lock = bucket_lock(old_tbl, old_hash); + spin_lock_bh(old_bucket_lock); + + new_tbl = rht_dereference_rcu(ht->future_tbl, ht); + new_hash = head_hashfn(ht, new_tbl, obj); + new_bucket_lock = bucket_lock(new_tbl, new_hash); + if (unlikely(old_tbl != new_tbl)) + spin_lock_bh_nested(new_bucket_lock, RHT_LOCK_NESTED); + + if (rhashtable_lookup(ht, rht_obj(ht, obj) + ht->p.key_offset)) { + success = false; + goto exit; + } + + __rhashtable_insert(ht, obj, new_tbl, new_hash); + +exit: + if (unlikely(old_tbl != new_tbl)) + spin_unlock_bh(new_bucket_lock); + spin_unlock_bh(old_bucket_lock); + + rcu_read_unlock(); + + return success; +} +EXPORT_SYMBOL_GPL(rhashtable_lookup_insert); + static size_t rounded_hashtable_size(struct rhashtable_params *params) { return max(roundup_pow_of_two(params->nelem_hint * 4 / 3), @@ -782,7 +847,8 @@ int rhashtable_init(struct rhashtable *ht, struct rhashtable_params *params) if (tbl == NULL) return -ENOMEM; - ht->shift = ilog2(tbl->size); + atomic_set(&ht->nelems, 0); + atomic_set(&ht->shift, ilog2(tbl->size)); RCU_INIT_POINTER(ht->tbl, tbl); RCU_INIT_POINTER(ht->future_tbl, tbl); diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig index c890848f9d560fc4cda305653c08f53c712dfbe2..91c8a8e031db718a067fa2ed4ef9f2924ddb7c25 100644 --- a/net/tipc/Kconfig +++ b/net/tipc/Kconfig @@ -20,18 +20,6 @@ menuconfig TIPC If in doubt, say N. -config TIPC_PORTS - int "Maximum number of ports in a node" - depends on TIPC - range 127 65535 - default "8191" - help - Specifies how many ports can be supported by a node. - Can range from 127 to 65535 ports; default is 8191. - - Setting this to a smaller value saves some memory, - setting it to higher allows for more ports. - config TIPC_MEDIA_IB bool "InfiniBand media type support" depends on TIPC && INFINIBAND_IPOIB diff --git a/net/tipc/config.c b/net/tipc/config.c index 876f4c6a2631b35eba2b9927430b30c34d125c87..0b3a90ecab6d934dbbceb152bcd352f13c671cf6 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -183,22 +183,6 @@ static struct sk_buff *cfg_set_own_addr(void) return tipc_cfg_reply_error_string("cannot change to network mode"); } -static struct sk_buff *cfg_set_max_ports(void) -{ - u32 value; - - if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_UNSIGNED)) - return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); - value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area)); - if (value == tipc_max_ports) - return tipc_cfg_reply_none(); - if (value < 127 || value > 65535) - return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE - " (max ports must be 127-65535)"); - return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED - " (cannot change max ports while TIPC is active)"); -} - static struct sk_buff *cfg_set_netid(void) { u32 value; @@ -285,15 +269,9 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area case TIPC_CMD_SET_NODE_ADDR: rep_tlv_buf = cfg_set_own_addr(); break; - case TIPC_CMD_SET_MAX_PORTS: - rep_tlv_buf = cfg_set_max_ports(); - break; case TIPC_CMD_SET_NETID: rep_tlv_buf = cfg_set_netid(); break; - case TIPC_CMD_GET_MAX_PORTS: - rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_ports); - break; case TIPC_CMD_GET_NETID: rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_net_id); break; @@ -317,6 +295,8 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area case TIPC_CMD_SET_REMOTE_MNG: case TIPC_CMD_GET_REMOTE_MNG: case TIPC_CMD_DUMP_LOG: + case TIPC_CMD_SET_MAX_PORTS: + case TIPC_CMD_GET_MAX_PORTS: rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (obsolete command)"); break; diff --git a/net/tipc/core.c b/net/tipc/core.c index a5737b8407ddbf63f9ecfb024bde3b079f979098..71b2ada0f5ab824a8354c434cbf182796d739666 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -34,6 +34,8 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt + #include "core.h" #include "name_table.h" #include "subscr.h" @@ -47,7 +49,6 @@ int tipc_random __read_mostly; /* configurable TIPC parameters */ u32 tipc_own_addr __read_mostly; -int tipc_max_ports __read_mostly; int tipc_net_id __read_mostly; int sysctl_tipc_rmem[3] __read_mostly; /* min/default/max */ @@ -84,9 +85,9 @@ static void tipc_core_stop(void) tipc_netlink_stop(); tipc_subscr_stop(); tipc_nametbl_stop(); - tipc_sk_ref_table_stop(); tipc_socket_stop(); tipc_unregister_sysctl(); + tipc_sk_rht_destroy(); } /** @@ -98,7 +99,7 @@ static int tipc_core_start(void) get_random_bytes(&tipc_random, sizeof(tipc_random)); - err = tipc_sk_ref_table_init(tipc_max_ports, tipc_random); + err = tipc_sk_rht_init(); if (err) goto out_reftbl; @@ -138,7 +139,7 @@ static int tipc_core_start(void) out_netlink: tipc_nametbl_stop(); out_nametbl: - tipc_sk_ref_table_stop(); + tipc_sk_rht_destroy(); out_reftbl: return err; } @@ -150,7 +151,6 @@ static int __init tipc_init(void) pr_info("Activated (version " TIPC_MOD_VER ")\n"); tipc_own_addr = 0; - tipc_max_ports = CONFIG_TIPC_PORTS; tipc_net_id = 4711; sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << diff --git a/net/tipc/core.h b/net/tipc/core.h index 84602137ce20f1fe051d7c3ebd2a8fafa10c2b43..56fe4229fc5e4de044d19a41eb128297da87a670 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -37,8 +37,6 @@ #ifndef _TIPC_CORE_H #define _TIPC_CORE_H -#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt - #include #include #include @@ -79,7 +77,6 @@ int tipc_snprintf(char *buf, int len, const char *fmt, ...); * Global configuration variables */ extern u32 tipc_own_addr __read_mostly; -extern int tipc_max_ports __read_mostly; extern int tipc_net_id __read_mostly; extern int sysctl_tipc_rmem[3] __read_mostly; extern int sysctl_tipc_named_timeout __read_mostly; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 4731cad99d1cc8896cc0d6734f6eb48af3135207..701f31bbbbfb727202e2cc72e9a5a940c7014bc5 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -34,22 +34,25 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include +#include #include "core.h" #include "name_table.h" #include "node.h" #include "link.h" -#include #include "config.h" #include "socket.h" -#define SS_LISTENING -1 /* socket is listening */ -#define SS_READY -2 /* socket is connectionless */ +#define SS_LISTENING -1 /* socket is listening */ +#define SS_READY -2 /* socket is connectionless */ -#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ -#define CONN_PROBING_INTERVAL 3600000 /* [ms] => 1 h */ -#define TIPC_FWD_MSG 1 -#define TIPC_CONN_OK 0 -#define TIPC_CONN_PROBING 1 +#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ +#define CONN_PROBING_INTERVAL 3600000 /* [ms] => 1 h */ +#define TIPC_FWD_MSG 1 +#define TIPC_CONN_OK 0 +#define TIPC_CONN_PROBING 1 +#define TIPC_MAX_PORT 0xffffffff +#define TIPC_MIN_PORT 1 /** * struct tipc_sock - TIPC socket structure @@ -59,7 +62,7 @@ * @conn_instance: TIPC instance used when connection was established * @published: non-zero if port has one or more associated names * @max_pkt: maximum packet size "hint" used when building messages sent by port - * @ref: unique reference to port in TIPC object registry + * @portid: unique port identity in TIPC socket hash table * @phdr: preformatted message header used when sending messages * @port_list: adjacent ports in TIPC's global list of ports * @publications: list of publications for port @@ -74,6 +77,8 @@ * @link_cong: non-zero if owner must sleep because of link congestion * @sent_unacked: # messages sent by socket, and not yet acked by peer * @rcv_unacked: # messages read by user, but not yet acked back to peer + * @node: hash table node + * @rcu: rcu struct for tipc_sock */ struct tipc_sock { struct sock sk; @@ -82,7 +87,7 @@ struct tipc_sock { u32 conn_instance; int published; u32 max_pkt; - u32 ref; + u32 portid; struct tipc_msg phdr; struct list_head sock_list; struct list_head publications; @@ -95,6 +100,8 @@ struct tipc_sock { bool link_cong; uint sent_unacked; uint rcv_unacked; + struct rhash_head node; + struct rcu_head rcu; }; static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); @@ -103,16 +110,14 @@ static void tipc_write_space(struct sock *sk); static int tipc_release(struct socket *sock); static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); -static void tipc_sk_timeout(unsigned long ref); +static void tipc_sk_timeout(unsigned long portid); static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); -static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk); -static void tipc_sk_ref_discard(u32 ref); -static struct tipc_sock *tipc_sk_get(u32 ref); -static struct tipc_sock *tipc_sk_get_next(u32 *ref); -static void tipc_sk_put(struct tipc_sock *tsk); +static struct tipc_sock *tipc_sk_lookup(u32 portid); +static int tipc_sk_insert(struct tipc_sock *tsk); +static void tipc_sk_remove(struct tipc_sock *tsk); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -174,6 +179,9 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = { * - port reference */ +/* Protects tipc socket hash table mutations */ +static struct rhashtable tipc_sk_rht; + static u32 tsk_peer_node(struct tipc_sock *tsk) { return msg_destnode(&tsk->phdr); @@ -305,7 +313,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock, struct sock *sk; struct tipc_sock *tsk; struct tipc_msg *msg; - u32 ref; /* Validate arguments */ if (unlikely(protocol != 0)) @@ -339,24 +346,22 @@ static int tipc_sk_create(struct net *net, struct socket *sock, return -ENOMEM; tsk = tipc_sk(sk); - ref = tipc_sk_ref_acquire(tsk); - if (!ref) { - pr_warn("Socket create failed; reference table exhausted\n"); - return -ENOMEM; - } tsk->max_pkt = MAX_PKT_DEFAULT; - tsk->ref = ref; INIT_LIST_HEAD(&tsk->publications); msg = &tsk->phdr; tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, NAMED_H_SIZE, 0); - msg_set_origport(msg, ref); /* Finish initializing socket data structures */ sock->ops = ops; sock->state = state; sock_init_data(sock, sk); - k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref); + if (tipc_sk_insert(tsk)) { + pr_warn("Socket create failed; port numbrer exhausted\n"); + return -EINVAL; + } + msg_set_origport(msg, tsk->portid); + k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, tsk->portid); sk->sk_backlog_rcv = tipc_backlog_rcv; sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; @@ -442,6 +447,13 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, return ret; } +static void tipc_sk_callback(struct rcu_head *head) +{ + struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu); + + sock_put(&tsk->sk); +} + /** * tipc_release - destroy a TIPC socket * @sock: socket to destroy @@ -491,7 +503,7 @@ static int tipc_release(struct socket *sock) (sock->state == SS_CONNECTED)) { sock->state = SS_DISCONNECTING; tsk->connected = 0; - tipc_node_remove_conn(dnode, tsk->ref); + tipc_node_remove_conn(dnode, tsk->portid); } if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT)) tipc_link_xmit_skb(skb, dnode, 0); @@ -499,16 +511,16 @@ static int tipc_release(struct socket *sock) } tipc_sk_withdraw(tsk, 0, NULL); - tipc_sk_ref_discard(tsk->ref); k_cancel_timer(&tsk->timer); + tipc_sk_remove(tsk); if (tsk->connected) { skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, tipc_own_addr, tsk_peer_port(tsk), - tsk->ref, TIPC_ERR_NO_PORT); + tsk->portid, TIPC_ERR_NO_PORT); if (skb) - tipc_link_xmit_skb(skb, dnode, tsk->ref); - tipc_node_remove_conn(dnode, tsk->ref); + tipc_link_xmit_skb(skb, dnode, tsk->portid); + tipc_node_remove_conn(dnode, tsk->portid); } k_term_timer(&tsk->timer); @@ -518,7 +530,8 @@ static int tipc_release(struct socket *sock) /* Reject any messages that accumulated in backlog queue */ sock->state = SS_DISCONNECTING; release_sock(sk); - sock_put(sk); + + call_rcu(&tsk->rcu, tipc_sk_callback); sock->sk = NULL; return 0; @@ -611,7 +624,7 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr, addr->addr.id.ref = tsk_peer_port(tsk); addr->addr.id.node = tsk_peer_node(tsk); } else { - addr->addr.id.ref = tsk->ref; + addr->addr.id.ref = tsk->portid; addr->addr.id.node = tipc_own_addr; } @@ -946,7 +959,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, } new_mtu: - mtu = tipc_node_get_mtu(dnode, tsk->ref); + mtu = tipc_node_get_mtu(dnode, tsk->portid); __skb_queue_head_init(&head); rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head); if (rc < 0) @@ -955,7 +968,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, do { skb = skb_peek(&head); TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; - rc = tipc_link_xmit(&head, dnode, tsk->ref); + rc = tipc_link_xmit(&head, dnode, tsk->portid); if (likely(rc >= 0)) { if (sock->state != SS_READY) sock->state = SS_CONNECTING; @@ -1028,7 +1041,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, struct tipc_msg *mhdr = &tsk->phdr; struct sk_buff_head head; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - u32 ref = tsk->ref; + u32 portid = tsk->portid; int rc = -EINVAL; long timeo; u32 dnode; @@ -1067,7 +1080,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, goto exit; do { if (likely(!tsk_conn_cong(tsk))) { - rc = tipc_link_xmit(&head, dnode, ref); + rc = tipc_link_xmit(&head, dnode, portid); if (likely(!rc)) { tsk->sent_unacked++; sent += send; @@ -1076,7 +1089,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, goto next; } if (rc == -EMSGSIZE) { - tsk->max_pkt = tipc_node_get_mtu(dnode, ref); + tsk->max_pkt = tipc_node_get_mtu(dnode, portid); goto next; } if (rc != -ELINKCONG) @@ -1130,8 +1143,8 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, tsk->probing_state = TIPC_CONN_OK; tsk->connected = 1; k_start_timer(&tsk->timer, tsk->probing_interval); - tipc_node_add_conn(peer_node, tsk->ref, peer_port); - tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref); + tipc_node_add_conn(peer_node, tsk->portid, peer_port); + tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->portid); } /** @@ -1238,7 +1251,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) if (!tsk->connected) return; skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, - tipc_own_addr, peer_port, tsk->ref, TIPC_OK); + tipc_own_addr, peer_port, tsk->portid, TIPC_OK); if (!skb) return; msg = buf_msg(skb); @@ -1552,7 +1565,7 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) tsk->connected = 0; /* let timer expire on it's own */ tipc_node_remove_conn(tsk_peer_node(tsk), - tsk->ref); + tsk->portid); } retval = TIPC_OK; } @@ -1743,7 +1756,7 @@ int tipc_sk_rcv(struct sk_buff *skb) u32 dnode; /* Validate destination and message */ - tsk = tipc_sk_get(dport); + tsk = tipc_sk_lookup(dport); if (unlikely(!tsk)) { rc = tipc_msg_eval(skb, &dnode); goto exit; @@ -1763,7 +1776,7 @@ int tipc_sk_rcv(struct sk_buff *skb) rc = -TIPC_ERR_OVERLOAD; } spin_unlock_bh(&sk->sk_lock.slock); - tipc_sk_put(tsk); + sock_put(sk); if (likely(!rc)) return 0; exit: @@ -2050,20 +2063,20 @@ static int tipc_shutdown(struct socket *sock, int how) goto restart; } if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN)) - tipc_link_xmit_skb(skb, dnode, tsk->ref); - tipc_node_remove_conn(dnode, tsk->ref); + tipc_link_xmit_skb(skb, dnode, tsk->portid); + tipc_node_remove_conn(dnode, tsk->portid); } else { dnode = tsk_peer_node(tsk); skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, tipc_own_addr, tsk_peer_port(tsk), - tsk->ref, TIPC_CONN_SHUTDOWN); - tipc_link_xmit_skb(skb, dnode, tsk->ref); + tsk->portid, TIPC_CONN_SHUTDOWN); + tipc_link_xmit_skb(skb, dnode, tsk->portid); } tsk->connected = 0; sock->state = SS_DISCONNECTING; - tipc_node_remove_conn(dnode, tsk->ref); + tipc_node_remove_conn(dnode, tsk->portid); /* fall through */ case SS_DISCONNECTING: @@ -2084,14 +2097,14 @@ static int tipc_shutdown(struct socket *sock, int how) return res; } -static void tipc_sk_timeout(unsigned long ref) +static void tipc_sk_timeout(unsigned long portid) { struct tipc_sock *tsk; struct sock *sk; struct sk_buff *skb = NULL; u32 peer_port, peer_node; - tsk = tipc_sk_get(ref); + tsk = tipc_sk_lookup(portid); if (!tsk) return; @@ -2108,20 +2121,20 @@ static void tipc_sk_timeout(unsigned long ref) /* Previous probe not answered -> self abort */ skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, tipc_own_addr, - peer_node, ref, peer_port, + peer_node, portid, peer_port, TIPC_ERR_NO_PORT); } else { skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0, peer_node, tipc_own_addr, - peer_port, ref, TIPC_OK); + peer_port, portid, TIPC_OK); tsk->probing_state = TIPC_CONN_PROBING; k_start_timer(&tsk->timer, tsk->probing_interval); } bh_unlock_sock(sk); if (skb) - tipc_link_xmit_skb(skb, peer_node, ref); + tipc_link_xmit_skb(skb, peer_node, portid); exit: - tipc_sk_put(tsk); + sock_put(sk); } static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, @@ -2132,12 +2145,12 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, if (tsk->connected) return -EINVAL; - key = tsk->ref + tsk->pub_count + 1; - if (key == tsk->ref) + key = tsk->portid + tsk->pub_count + 1; + if (key == tsk->portid) return -EADDRINUSE; publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, - scope, tsk->ref, key); + scope, tsk->portid, key); if (unlikely(!publ)) return -EINVAL; @@ -2188,9 +2201,9 @@ static int tipc_sk_show(struct tipc_sock *tsk, char *buf, ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:", tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), - tipc_node(tipc_own_addr), tsk->ref); + tipc_node(tipc_own_addr), tsk->portid); else - ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref); + ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid); if (tsk->connected) { u32 dport = tsk_peer_port(tsk); @@ -2224,13 +2237,15 @@ static int tipc_sk_show(struct tipc_sock *tsk, char *buf, struct sk_buff *tipc_sk_socks_show(void) { + const struct bucket_table *tbl; + struct rhash_head *pos; struct sk_buff *buf; struct tlv_desc *rep_tlv; char *pb; int pb_len; struct tipc_sock *tsk; int str_len = 0; - u32 ref = 0; + int i; buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN)); if (!buf) @@ -2239,14 +2254,18 @@ struct sk_buff *tipc_sk_socks_show(void) pb = TLV_DATA(rep_tlv); pb_len = ULTRA_STRING_MAX_LEN; - tsk = tipc_sk_get_next(&ref); - for (; tsk; tsk = tipc_sk_get_next(&ref)) { - lock_sock(&tsk->sk); - str_len += tipc_sk_show(tsk, pb + str_len, - pb_len - str_len, 0); - release_sock(&tsk->sk); - tipc_sk_put(tsk); + rcu_read_lock(); + tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht); + for (i = 0; i < tbl->size; i++) { + rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { + spin_lock_bh(&tsk->sk.sk_lock.slock); + str_len += tipc_sk_show(tsk, pb + str_len, + pb_len - str_len, 0); + spin_unlock_bh(&tsk->sk.sk_lock.slock); + } } + rcu_read_unlock(); + str_len += 1; /* for "\0" */ skb_put(buf, TLV_SPACE(str_len)); TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); @@ -2259,255 +2278,91 @@ struct sk_buff *tipc_sk_socks_show(void) */ void tipc_sk_reinit(void) { + const struct bucket_table *tbl; + struct rhash_head *pos; + struct tipc_sock *tsk; struct tipc_msg *msg; - u32 ref = 0; - struct tipc_sock *tsk = tipc_sk_get_next(&ref); + int i; - for (; tsk; tsk = tipc_sk_get_next(&ref)) { - lock_sock(&tsk->sk); - msg = &tsk->phdr; - msg_set_prevnode(msg, tipc_own_addr); - msg_set_orignode(msg, tipc_own_addr); - release_sock(&tsk->sk); - tipc_sk_put(tsk); + rcu_read_lock(); + tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht); + for (i = 0; i < tbl->size; i++) { + rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { + spin_lock_bh(&tsk->sk.sk_lock.slock); + msg = &tsk->phdr; + msg_set_prevnode(msg, tipc_own_addr); + msg_set_orignode(msg, tipc_own_addr); + spin_unlock_bh(&tsk->sk.sk_lock.slock); + } } + rcu_read_unlock(); } -/** - * struct reference - TIPC socket reference entry - * @tsk: pointer to socket associated with reference entry - * @ref: reference value for socket (combines instance & array index info) - */ -struct reference { - struct tipc_sock *tsk; - u32 ref; -}; - -/** - * struct tipc_ref_table - table of TIPC socket reference entries - * @entries: pointer to array of reference entries - * @capacity: array index of first unusable entry - * @init_point: array index of first uninitialized entry - * @first_free: array index of first unused socket reference entry - * @last_free: array index of last unused socket reference entry - * @index_mask: bitmask for array index portion of reference values - * @start_mask: initial value for instance value portion of reference values - */ -struct ref_table { - struct reference *entries; - u32 capacity; - u32 init_point; - u32 first_free; - u32 last_free; - u32 index_mask; - u32 start_mask; -}; - -/* Socket reference table consists of 2**N entries. - * - * State Socket ptr Reference - * ----- ---------- --------- - * In use non-NULL XXXX|own index - * (XXXX changes each time entry is acquired) - * Free NULL YYYY|next free index - * (YYYY is one more than last used XXXX) - * Uninitialized NULL 0 - * - * Entry 0 is not used; this allows index 0 to denote the end of the free list. - * - * Note that a reference value of 0 does not necessarily indicate that an - * entry is uninitialized, since the last entry in the free list could also - * have a reference value of 0 (although this is unlikely). - */ - -static struct ref_table tipc_ref_table; - -static DEFINE_RWLOCK(ref_table_lock); - -/** - * tipc_ref_table_init - create reference table for sockets - */ -int tipc_sk_ref_table_init(u32 req_sz, u32 start) +static struct tipc_sock *tipc_sk_lookup(u32 portid) { - struct reference *table; - u32 actual_sz; - - /* account for unused entry, then round up size to a power of 2 */ - - req_sz++; - for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) { - /* do nothing */ - }; - - /* allocate table & mark all entries as uninitialized */ - table = vzalloc(actual_sz * sizeof(struct reference)); - if (table == NULL) - return -ENOMEM; - - tipc_ref_table.entries = table; - tipc_ref_table.capacity = req_sz; - tipc_ref_table.init_point = 1; - tipc_ref_table.first_free = 0; - tipc_ref_table.last_free = 0; - tipc_ref_table.index_mask = actual_sz - 1; - tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask; + struct tipc_sock *tsk; - return 0; -} + rcu_read_lock(); + tsk = rhashtable_lookup(&tipc_sk_rht, &portid); + if (tsk) + sock_hold(&tsk->sk); + rcu_read_unlock(); -/** - * tipc_ref_table_stop - destroy reference table for sockets - */ -void tipc_sk_ref_table_stop(void) -{ - if (!tipc_ref_table.entries) - return; - vfree(tipc_ref_table.entries); - tipc_ref_table.entries = NULL; + return tsk; } -/* tipc_ref_acquire - create reference to a socket - * - * Register an socket pointer in the reference table. - * Returns a unique reference value that is used from then on to retrieve the - * socket pointer, or to determine if the socket has been deregistered. - */ -u32 tipc_sk_ref_acquire(struct tipc_sock *tsk) +static int tipc_sk_insert(struct tipc_sock *tsk) { - u32 index; - u32 index_mask; - u32 next_plus_upper; - u32 ref = 0; - struct reference *entry; - - if (unlikely(!tsk)) { - pr_err("Attempt to acquire ref. to non-existent obj\n"); - return 0; - } - if (unlikely(!tipc_ref_table.entries)) { - pr_err("Ref. table not found in acquisition attempt\n"); - return 0; - } - - /* Take a free entry, if available; otherwise initialize a new one */ - write_lock_bh(&ref_table_lock); - index = tipc_ref_table.first_free; - entry = &tipc_ref_table.entries[index]; + u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1; + u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT; - if (likely(index)) { - index = tipc_ref_table.first_free; - entry = &tipc_ref_table.entries[index]; - index_mask = tipc_ref_table.index_mask; - next_plus_upper = entry->ref; - tipc_ref_table.first_free = next_plus_upper & index_mask; - ref = (next_plus_upper & ~index_mask) + index; - entry->tsk = tsk; - } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) { - index = tipc_ref_table.init_point++; - entry = &tipc_ref_table.entries[index]; - ref = tipc_ref_table.start_mask + index; + while (remaining--) { + portid++; + if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT)) + portid = TIPC_MIN_PORT; + tsk->portid = portid; + sock_hold(&tsk->sk); + if (rhashtable_lookup_insert(&tipc_sk_rht, &tsk->node)) + return 0; + sock_put(&tsk->sk); } - if (ref) { - entry->ref = ref; - entry->tsk = tsk; - } - write_unlock_bh(&ref_table_lock); - return ref; + return -1; } -/* tipc_sk_ref_discard - invalidate reference to an socket - * - * Disallow future references to an socket and free up the entry for re-use. - */ -void tipc_sk_ref_discard(u32 ref) +static void tipc_sk_remove(struct tipc_sock *tsk) { - struct reference *entry; - u32 index; - u32 index_mask; - - if (unlikely(!tipc_ref_table.entries)) { - pr_err("Ref. table not found during discard attempt\n"); - return; - } - - index_mask = tipc_ref_table.index_mask; - index = ref & index_mask; - entry = &tipc_ref_table.entries[index]; - - write_lock_bh(&ref_table_lock); + struct sock *sk = &tsk->sk; - if (unlikely(!entry->tsk)) { - pr_err("Attempt to discard ref. to non-existent socket\n"); - goto exit; - } - if (unlikely(entry->ref != ref)) { - pr_err("Attempt to discard non-existent reference\n"); - goto exit; + if (rhashtable_remove(&tipc_sk_rht, &tsk->node)) { + WARN_ON(atomic_read(&sk->sk_refcnt) == 1); + __sock_put(sk); } - - /* Mark entry as unused; increment instance part of entry's - * reference to invalidate any subsequent references - */ - - entry->tsk = NULL; - entry->ref = (ref & ~index_mask) + (index_mask + 1); - - /* Append entry to free entry list */ - if (unlikely(tipc_ref_table.first_free == 0)) - tipc_ref_table.first_free = index; - else - tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index; - tipc_ref_table.last_free = index; -exit: - write_unlock_bh(&ref_table_lock); } -/* tipc_sk_get - find referenced socket and return pointer to it - */ -struct tipc_sock *tipc_sk_get(u32 ref) +int tipc_sk_rht_init(void) { - struct reference *entry; - struct tipc_sock *tsk; + struct rhashtable_params rht_params = { + .nelem_hint = 192, + .head_offset = offsetof(struct tipc_sock, node), + .key_offset = offsetof(struct tipc_sock, portid), + .key_len = sizeof(u32), /* portid */ + .hashfn = jhash, + .max_shift = 20, /* 1M */ + .min_shift = 8, /* 256 */ + .grow_decision = rht_grow_above_75, + .shrink_decision = rht_shrink_below_30, + }; - if (unlikely(!tipc_ref_table.entries)) - return NULL; - read_lock_bh(&ref_table_lock); - entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask]; - tsk = entry->tsk; - if (likely(tsk && (entry->ref == ref))) - sock_hold(&tsk->sk); - else - tsk = NULL; - read_unlock_bh(&ref_table_lock); - return tsk; + return rhashtable_init(&tipc_sk_rht, &rht_params); } -/* tipc_sk_get_next - lock & return next socket after referenced one -*/ -struct tipc_sock *tipc_sk_get_next(u32 *ref) +void tipc_sk_rht_destroy(void) { - struct reference *entry; - struct tipc_sock *tsk = NULL; - uint index = *ref & tipc_ref_table.index_mask; + /* Wait for socket readers to complete */ + synchronize_net(); - read_lock_bh(&ref_table_lock); - while (++index < tipc_ref_table.capacity) { - entry = &tipc_ref_table.entries[index]; - if (!entry->tsk) - continue; - tsk = entry->tsk; - sock_hold(&tsk->sk); - *ref = entry->ref; - break; - } - read_unlock_bh(&ref_table_lock); - return tsk; -} - -static void tipc_sk_put(struct tipc_sock *tsk) -{ - sock_put(&tsk->sk); + rhashtable_destroy(&tipc_sk_rht); } /** @@ -2829,7 +2684,7 @@ static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb, attrs = nla_nest_start(skb, TIPC_NLA_SOCK); if (!attrs) goto genlmsg_cancel; - if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref)) + if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid)) goto attr_msg_cancel; if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr)) goto attr_msg_cancel; @@ -2859,22 +2714,29 @@ int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb) { int err; struct tipc_sock *tsk; - u32 prev_ref = cb->args[0]; - u32 ref = prev_ref; - - tsk = tipc_sk_get_next(&ref); - for (; tsk; tsk = tipc_sk_get_next(&ref)) { - lock_sock(&tsk->sk); - err = __tipc_nl_add_sk(skb, cb, tsk); - release_sock(&tsk->sk); - tipc_sk_put(tsk); - if (err) - break; + const struct bucket_table *tbl; + struct rhash_head *pos; + u32 prev_portid = cb->args[0]; + u32 portid = prev_portid; + int i; - prev_ref = ref; + rcu_read_lock(); + tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht); + for (i = 0; i < tbl->size; i++) { + rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { + spin_lock_bh(&tsk->sk.sk_lock.slock); + portid = tsk->portid; + err = __tipc_nl_add_sk(skb, cb, tsk); + spin_unlock_bh(&tsk->sk.sk_lock.slock); + if (err) + break; + + prev_portid = portid; + } } + rcu_read_unlock(); - cb->args[0] = prev_ref; + cb->args[0] = prev_portid; return skb->len; } @@ -2962,12 +2824,12 @@ static int __tipc_nl_list_sk_publ(struct sk_buff *skb, int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb) { int err; - u32 tsk_ref = cb->args[0]; + u32 tsk_portid = cb->args[0]; u32 last_publ = cb->args[1]; u32 done = cb->args[2]; struct tipc_sock *tsk; - if (!tsk_ref) { + if (!tsk_portid) { struct nlattr **attrs; struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1]; @@ -2984,13 +2846,13 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb) if (!sock[TIPC_NLA_SOCK_REF]) return -EINVAL; - tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]); + tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]); } if (done) return 0; - tsk = tipc_sk_get(tsk_ref); + tsk = tipc_sk_lookup(tsk_portid); if (!tsk) return -EINVAL; @@ -2999,9 +2861,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb) if (!err) done = 1; release_sock(&tsk->sk); - tipc_sk_put(tsk); + sock_put(&tsk->sk); - cb->args[0] = tsk_ref; + cb->args[0] = tsk_portid; cb->args[1] = last_publ; cb->args[2] = done; diff --git a/net/tipc/socket.h b/net/tipc/socket.h index d3408938700677e427b7ccbfe80a2dcf009f095f..c7d46d069d89bb47f259daeb72aae01f951be52a 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -46,8 +46,8 @@ int tipc_sk_rcv(struct sk_buff *buf); struct sk_buff *tipc_sk_socks_show(void); void tipc_sk_mcast_rcv(struct sk_buff *buf); void tipc_sk_reinit(void); -int tipc_sk_ref_table_init(u32 requested_size, u32 start); -void tipc_sk_ref_table_stop(void); +int tipc_sk_rht_init(void); +void tipc_sk_rht_destroy(void); int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb); int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb);