diff --git a/net/rds/connection.c b/net/rds/connection.c index a4b07c899d898aa6efeec711f1cc8fb61bf399fc..19a4fee5f4ddc150d560952bba9a48a84add8771 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -253,9 +253,12 @@ static struct rds_connection *__rds_conn_create(struct net *net, for (i = 0; i < RDS_MPATH_WORKERS; i++) { cp = &conn->c_path[i]; - trans->conn_free(cp->cp_transport_data); - if (!trans->t_mp_capable) - break; + /* The ->conn_alloc invocation may have + * allocated resource for all paths, so all + * of them may have to be freed here. + */ + if (cp->cp_transport_data) + trans->conn_free(cp->cp_transport_data); } kmem_cache_free(rds_conn_slab, conn); conn = found; @@ -326,10 +329,7 @@ void rds_conn_shutdown(struct rds_conn_path *cp) wait_event(cp->cp_waitq, !test_bit(RDS_RECV_REFILL, &cp->cp_flags)); - if (!conn->c_trans->t_mp_capable) - conn->c_trans->conn_shutdown(conn); - else - conn->c_trans->conn_path_shutdown(cp); + conn->c_trans->conn_path_shutdown(cp); rds_conn_path_reset(cp); if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING, @@ -355,9 +355,7 @@ void rds_conn_shutdown(struct rds_conn_path *cp) rcu_read_lock(); if (!hlist_unhashed(&conn->c_hash_node)) { rcu_read_unlock(); - if (conn->c_trans->t_type != RDS_TRANS_TCP || - cp->cp_outgoing == 1) - rds_queue_reconnect(cp); + rds_queue_reconnect(cp); } else { rcu_read_unlock(); } @@ -370,6 +368,9 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp) { struct rds_message *rm, *rtmp; + if (!cp->cp_transport_data) + return; + rds_conn_path_drop(cp); flush_work(&cp->cp_down_w); @@ -401,6 +402,8 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp) void rds_conn_destroy(struct rds_connection *conn) { unsigned long flags; + int i; + struct rds_conn_path *cp; rdsdebug("freeing conn %p for %pI4 -> " "%pI4\n", conn, &conn->c_laddr, @@ -413,18 +416,10 @@ void rds_conn_destroy(struct rds_connection *conn) synchronize_rcu(); /* shut the connection down */ - if (!conn->c_trans->t_mp_capable) { - rds_conn_path_destroy(&conn->c_path[0]); - BUG_ON(!list_empty(&conn->c_path[0].cp_retrans)); - } else { - int i; - struct rds_conn_path *cp; - - for (i = 0; i < RDS_MPATH_WORKERS; i++) { - cp = &conn->c_path[i]; - rds_conn_path_destroy(cp); - BUG_ON(!list_empty(&cp->cp_retrans)); - } + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + rds_conn_path_destroy(cp); + BUG_ON(!list_empty(&cp->cp_retrans)); } /* diff --git a/net/rds/ib.c b/net/rds/ib.c index 44946a681a8c66ef491ba304a46489763e821c64..7eaf887e46f8ef12ffd7d72aeadb7d0e15459851 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -381,15 +381,15 @@ void rds_ib_exit(void) struct rds_transport rds_ib_transport = { .laddr_check = rds_ib_laddr_check, - .xmit_complete = rds_ib_xmit_complete, + .xmit_path_complete = rds_ib_xmit_path_complete, .xmit = rds_ib_xmit, .xmit_rdma = rds_ib_xmit_rdma, .xmit_atomic = rds_ib_xmit_atomic, - .recv = rds_ib_recv, + .recv_path = rds_ib_recv_path, .conn_alloc = rds_ib_conn_alloc, .conn_free = rds_ib_conn_free, - .conn_connect = rds_ib_conn_connect, - .conn_shutdown = rds_ib_conn_shutdown, + .conn_path_connect = rds_ib_conn_path_connect, + .conn_path_shutdown = rds_ib_conn_path_shutdown, .inc_copy_to_user = rds_ib_inc_copy_to_user, .inc_free = rds_ib_inc_free, .cm_initiate_connect = rds_ib_cm_initiate_connect, diff --git a/net/rds/ib.h b/net/rds/ib.h index 627fb79aee65b24baa7820d93b63cc3a02f2c6f5..046f7508c06b481bab849d73aa89be47f2e9a411 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -328,8 +328,8 @@ extern struct list_head ib_nodev_conns; /* ib_cm.c */ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp); void rds_ib_conn_free(void *arg); -int rds_ib_conn_connect(struct rds_connection *conn); -void rds_ib_conn_shutdown(struct rds_connection *conn); +int rds_ib_conn_path_connect(struct rds_conn_path *cp); +void rds_ib_conn_path_shutdown(struct rds_conn_path *cp); void rds_ib_state_change(struct sock *sk); int rds_ib_listen_init(void); void rds_ib_listen_stop(void); @@ -354,7 +354,7 @@ void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc); /* ib_recv.c */ int rds_ib_recv_init(void); void rds_ib_recv_exit(void); -int rds_ib_recv(struct rds_connection *conn); +int rds_ib_recv_path(struct rds_conn_path *conn); int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic); void rds_ib_recv_free_caches(struct rds_ib_connection *ic); void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp); @@ -384,7 +384,7 @@ u32 rds_ib_ring_completed(struct rds_ib_work_ring *ring, u32 wr_id, u32 oldest); extern wait_queue_head_t rds_ib_ring_empty_wait; /* ib_send.c */ -void rds_ib_xmit_complete(struct rds_connection *conn); +void rds_ib_xmit_path_complete(struct rds_conn_path *cp); int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index e48bb1ba3dfc3413a2bbc16971be58274812e34e..5b2ab95afa072f4970e25bbc856e916fe28187aa 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -685,8 +685,9 @@ int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id) return ret; } -int rds_ib_conn_connect(struct rds_connection *conn) +int rds_ib_conn_path_connect(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; struct sockaddr_in src, dest; int ret; @@ -731,8 +732,9 @@ int rds_ib_conn_connect(struct rds_connection *conn) * so that it can be called at any point during startup. In fact it * can be called multiple times for a given connection. */ -void rds_ib_conn_shutdown(struct rds_connection *conn) +void rds_ib_conn_path_shutdown(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; int err = 0; diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index 4ea8cb17cc7a86dc00b4e12d50fad7132cb9b4c4..606a11f681d28b18879629e758b89f8a9ae8b33f 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -1009,8 +1009,9 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, rds_ib_recv_refill(conn, 0, GFP_NOWAIT); } -int rds_ib_recv(struct rds_connection *conn) +int rds_ib_recv_path(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; int ret = 0; diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 6e4110aa513593cdc972468c87aaac7f843ba393..84d90c97332f9178552f34f2e13973a351f6213a 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -980,8 +980,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) return ret; } -void rds_ib_xmit_complete(struct rds_connection *conn) +void rds_ib_xmit_path_complete(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; /* We may have a pending ACK or window update we were unable diff --git a/net/rds/loop.c b/net/rds/loop.c index 15f83db78f0c2180a2c09492abf881c9c424aed6..f2bf78de5688a3ee44862fe158bffee4ca94d91a 100644 --- a/net/rds/loop.c +++ b/net/rds/loop.c @@ -102,7 +102,7 @@ static void rds_loop_inc_free(struct rds_incoming *inc) } /* we need to at least give the thread something to succeed */ -static int rds_loop_recv(struct rds_connection *conn) +static int rds_loop_recv_path(struct rds_conn_path *cp) { return 0; } @@ -150,13 +150,13 @@ static void rds_loop_conn_free(void *arg) kfree(lc); } -static int rds_loop_conn_connect(struct rds_connection *conn) +static int rds_loop_conn_path_connect(struct rds_conn_path *cp) { - rds_connect_complete(conn); + rds_connect_complete(cp->cp_conn); return 0; } -static void rds_loop_conn_shutdown(struct rds_connection *conn) +static void rds_loop_conn_path_shutdown(struct rds_conn_path *cp) { } @@ -185,11 +185,11 @@ void rds_loop_exit(void) */ struct rds_transport rds_loop_transport = { .xmit = rds_loop_xmit, - .recv = rds_loop_recv, + .recv_path = rds_loop_recv_path, .conn_alloc = rds_loop_conn_alloc, .conn_free = rds_loop_conn_free, - .conn_connect = rds_loop_conn_connect, - .conn_shutdown = rds_loop_conn_shutdown, + .conn_path_connect = rds_loop_conn_path_connect, + .conn_path_shutdown = rds_loop_conn_path_shutdown, .inc_copy_to_user = rds_message_inc_copy_to_user, .inc_free = rds_loop_inc_free, .t_name = "loopback", diff --git a/net/rds/rds.h b/net/rds/rds.h index 2e35b738176f35bdad33a32196e298bc9ee9824c..6ef07bd272275de9b31fd19d2782bd0bbc142d52 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -454,18 +454,15 @@ struct rds_transport { int (*laddr_check)(struct net *net, __be32 addr); int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp); void (*conn_free)(void *data); - int (*conn_connect)(struct rds_connection *conn); - void (*conn_shutdown)(struct rds_connection *conn); + int (*conn_path_connect)(struct rds_conn_path *cp); void (*conn_path_shutdown)(struct rds_conn_path *conn); - void (*xmit_prepare)(struct rds_connection *conn); void (*xmit_path_prepare)(struct rds_conn_path *cp); - void (*xmit_complete)(struct rds_connection *conn); void (*xmit_path_complete)(struct rds_conn_path *cp); int (*xmit)(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op); - int (*recv)(struct rds_connection *conn); + int (*recv_path)(struct rds_conn_path *cp); int (*inc_copy_to_user)(struct rds_incoming *inc, struct iov_iter *to); void (*inc_free)(struct rds_incoming *inc); diff --git a/net/rds/recv.c b/net/rds/recv.c index b58f50571782b1921036696d262bba145456dac1..fed53a6c28901c05aa4e4a10d12d2fb7793342d1 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -226,6 +226,10 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1; if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) { + if (inc->i_hdr.h_sport == 0) { + rdsdebug("ignore ping with 0 sport from 0x%x\n", saddr); + goto out; + } rds_stats_inc(s_recv_ping); rds_send_pong(cp, inc->i_hdr.h_sport); goto out; diff --git a/net/rds/send.c b/net/rds/send.c index ee43d6b2ea8f75a3d73c1f26b792a359edcab05f..5a9caf1da89630181615c418206fb33eef7ab4be 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -183,12 +183,8 @@ int rds_send_xmit(struct rds_conn_path *cp) goto out; } - if (conn->c_trans->t_mp_capable) { - if (conn->c_trans->xmit_path_prepare) - conn->c_trans->xmit_path_prepare(cp); - } else if (conn->c_trans->xmit_prepare) { - conn->c_trans->xmit_prepare(conn); - } + if (conn->c_trans->xmit_path_prepare) + conn->c_trans->xmit_path_prepare(cp); /* * spin trying to push headers and data down the connection until @@ -403,12 +399,8 @@ int rds_send_xmit(struct rds_conn_path *cp) } over_batch: - if (conn->c_trans->t_mp_capable) { - if (conn->c_trans->xmit_path_complete) - conn->c_trans->xmit_path_complete(cp); - } else if (conn->c_trans->xmit_complete) { - conn->c_trans->xmit_complete(conn); - } + if (conn->c_trans->xmit_path_complete) + conn->c_trans->xmit_path_complete(cp); release_in_xmit(cp); /* Nuke any messages we decided not to retransmit. */ diff --git a/net/rds/tcp.c b/net/rds/tcp.c index 5217d49ce6d69356f11db383a95fab25f8f6d903..d278432f080b1e332645e5ed623ce1b700355982 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -136,9 +136,9 @@ void rds_tcp_restore_callbacks(struct socket *sock, * from being called while it isn't set. */ void rds_tcp_reset_callbacks(struct socket *sock, - struct rds_connection *conn) + struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *osock = tc->t_sock; if (!osock) @@ -148,8 +148,8 @@ void rds_tcp_reset_callbacks(struct socket *sock, * We have an outstanding SYN to this peer, which may * potentially have transitioned to the RDS_CONN_UP state, * so we must quiesce any send threads before resetting - * c_transport_data. We quiesce these threads by setting - * c_state to something other than RDS_CONN_UP, and then + * cp_transport_data. We quiesce these threads by setting + * cp_state to something other than RDS_CONN_UP, and then * waiting for any existing threads in rds_send_xmit to * complete release_in_xmit(). (Subsequent threads entering * rds_send_xmit() will bail on !rds_conn_up(). @@ -164,8 +164,8 @@ void rds_tcp_reset_callbacks(struct socket *sock, * RDS_CONN_RESETTTING, to ensure that rds_tcp_state_change * cannot mark rds_conn_path_up() in the window before lock_sock() */ - atomic_set(&conn->c_state, RDS_CONN_RESETTING); - wait_event(conn->c_waitq, !test_bit(RDS_IN_XMIT, &conn->c_flags)); + atomic_set(&cp->cp_state, RDS_CONN_RESETTING); + wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags)); lock_sock(osock->sk); /* reset receive side state for rds_tcp_data_recv() for osock */ if (tc->t_tinc) { @@ -186,11 +186,12 @@ void rds_tcp_reset_callbacks(struct socket *sock, release_sock(osock->sk); sock_release(osock); newsock: - rds_send_path_reset(&conn->c_path[0]); + rds_send_path_reset(cp); lock_sock(sock->sk); write_lock_bh(&sock->sk->sk_callback_lock); tc->t_sock = sock; - sock->sk->sk_user_data = conn; + tc->t_cpath = cp; + sock->sk->sk_user_data = cp; sock->sk->sk_data_ready = rds_tcp_data_ready; sock->sk->sk_write_space = rds_tcp_write_space; sock->sk->sk_state_change = rds_tcp_state_change; @@ -203,9 +204,9 @@ void rds_tcp_reset_callbacks(struct socket *sock, * above rds_tcp_reset_callbacks for notes about synchronization * with data path */ -void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn) +void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rdsdebug("setting sock %p callbacks to tc %p\n", sock, tc); write_lock_bh(&sock->sk->sk_callback_lock); @@ -221,12 +222,12 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn) sock->sk->sk_data_ready = sock->sk->sk_user_data; tc->t_sock = sock; - tc->conn = conn; + tc->t_cpath = cp; tc->t_orig_data_ready = sock->sk->sk_data_ready; tc->t_orig_write_space = sock->sk->sk_write_space; tc->t_orig_state_change = sock->sk->sk_state_change; - sock->sk->sk_user_data = conn; + sock->sk->sk_user_data = cp; sock->sk->sk_data_ready = rds_tcp_data_ready; sock->sk->sk_write_space = rds_tcp_write_space; sock->sk->sk_state_change = rds_tcp_state_change; @@ -284,24 +285,29 @@ static int rds_tcp_laddr_check(struct net *net, __be32 addr) static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp) { struct rds_tcp_connection *tc; + int i; - tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp); - if (!tc) - return -ENOMEM; + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp); + if (!tc) + return -ENOMEM; - mutex_init(&tc->t_conn_lock); - tc->t_sock = NULL; - tc->t_tinc = NULL; - tc->t_tinc_hdr_rem = sizeof(struct rds_header); - tc->t_tinc_data_rem = 0; + mutex_init(&tc->t_conn_path_lock); + tc->t_sock = NULL; + tc->t_tinc = NULL; + tc->t_tinc_hdr_rem = sizeof(struct rds_header); + tc->t_tinc_data_rem = 0; - conn->c_transport_data = tc; + conn->c_path[i].cp_transport_data = tc; + tc->t_cpath = &conn->c_path[i]; - spin_lock_irq(&rds_tcp_conn_lock); - list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list); - spin_unlock_irq(&rds_tcp_conn_lock); + spin_lock_irq(&rds_tcp_conn_lock); + list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list); + spin_unlock_irq(&rds_tcp_conn_lock); + rdsdebug("rds_conn_path [%d] tc %p\n", i, + conn->c_path[i].cp_transport_data); + } - rdsdebug("alloced tc %p\n", conn->c_transport_data); return 0; } @@ -318,6 +324,17 @@ static void rds_tcp_conn_free(void *arg) kmem_cache_free(rds_tcp_conn_slab, tc); } +static bool list_has_conn(struct list_head *list, struct rds_connection *conn) +{ + struct rds_tcp_connection *tc, *_tc; + + list_for_each_entry_safe(tc, _tc, list, t_tcp_node) { + if (tc->t_cpath->cp_conn == conn) + return true; + } + return false; +} + static void rds_tcp_destroy_conns(void) { struct rds_tcp_connection *tc, *_tc; @@ -325,29 +342,28 @@ static void rds_tcp_destroy_conns(void) /* avoid calling conn_destroy with irqs off */ spin_lock_irq(&rds_tcp_conn_lock); - list_splice(&rds_tcp_conn_list, &tmp_list); - INIT_LIST_HEAD(&rds_tcp_conn_list); + list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { + if (!list_has_conn(&tmp_list, tc->t_cpath->cp_conn)) + list_move_tail(&tc->t_tcp_node, &tmp_list); + } spin_unlock_irq(&rds_tcp_conn_lock); - list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) { - if (tc->conn->c_passive) - rds_conn_destroy(tc->conn->c_passive); - rds_conn_destroy(tc->conn); - } + list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) + rds_conn_destroy(tc->t_cpath->cp_conn); } static void rds_tcp_exit(void); struct rds_transport rds_tcp_transport = { .laddr_check = rds_tcp_laddr_check, - .xmit_prepare = rds_tcp_xmit_prepare, - .xmit_complete = rds_tcp_xmit_complete, + .xmit_path_prepare = rds_tcp_xmit_path_prepare, + .xmit_path_complete = rds_tcp_xmit_path_complete, .xmit = rds_tcp_xmit, - .recv = rds_tcp_recv, + .recv_path = rds_tcp_recv_path, .conn_alloc = rds_tcp_conn_alloc, .conn_free = rds_tcp_conn_free, - .conn_connect = rds_tcp_conn_connect, - .conn_shutdown = rds_tcp_conn_shutdown, + .conn_path_connect = rds_tcp_conn_path_connect, + .conn_path_shutdown = rds_tcp_conn_path_shutdown, .inc_copy_to_user = rds_tcp_inc_copy_to_user, .inc_free = rds_tcp_inc_free, .stats_info_copy = rds_tcp_stats_info_copy, @@ -489,10 +505,30 @@ static struct pernet_operations rds_tcp_net_ops = { .size = sizeof(struct rds_tcp_net), }; +/* explicitly send a RST on each socket, thereby releasing any socket refcnts + * that may otherwise hold up netns deletion. + */ +static void rds_tcp_conn_paths_destroy(struct rds_connection *conn) +{ + struct rds_conn_path *cp; + struct rds_tcp_connection *tc; + int i; + struct sock *sk; + + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + tc = cp->cp_transport_data; + if (!tc->t_sock) + continue; + sk = tc->t_sock->sk; + sk->sk_prot->disconnect(sk, 0); + tcp_done(sk); + } +} + static void rds_tcp_kill_sock(struct net *net) { struct rds_tcp_connection *tc, *_tc; - struct sock *sk; LIST_HEAD(tmp_list); struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); @@ -501,20 +537,17 @@ static void rds_tcp_kill_sock(struct net *net) flush_work(&rtn->rds_tcp_accept_w); spin_lock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { - struct net *c_net = read_pnet(&tc->conn->c_net); + struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net); if (net != c_net || !tc->t_sock) continue; - list_move_tail(&tc->t_tcp_node, &tmp_list); + if (!list_has_conn(&tmp_list, tc->t_cpath->cp_conn)) + list_move_tail(&tc->t_tcp_node, &tmp_list); } spin_unlock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) { - sk = tc->t_sock->sk; - sk->sk_prot->disconnect(sk, 0); - tcp_done(sk); - if (tc->conn->c_passive) - rds_conn_destroy(tc->conn->c_passive); - rds_conn_destroy(tc->conn); + rds_tcp_conn_paths_destroy(tc->t_cpath->cp_conn); + rds_conn_destroy(tc->t_cpath->cp_conn); } } @@ -552,12 +585,13 @@ static void rds_tcp_sysctl_reset(struct net *net) spin_lock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { - struct net *c_net = read_pnet(&tc->conn->c_net); + struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net); if (net != c_net || !tc->t_sock) continue; - rds_conn_drop(tc->conn); /* reconnect with new parameters */ + /* reconnect with new parameters */ + rds_conn_path_drop(tc->t_cpath); } spin_unlock_irq(&rds_tcp_conn_lock); } diff --git a/net/rds/tcp.h b/net/rds/tcp.h index 7940babf6c718617c729247ac348b2b6ed68a53a..1c3160faa96307b321e460b78a87bc693081c6fc 100644 --- a/net/rds/tcp.h +++ b/net/rds/tcp.h @@ -11,11 +11,11 @@ struct rds_tcp_incoming { struct rds_tcp_connection { struct list_head t_tcp_node; - struct rds_connection *conn; - /* t_conn_lock synchronizes the connection establishment between - * rds_tcp_accept_one and rds_tcp_conn_connect + struct rds_conn_path *t_cpath; + /* t_conn_path_lock synchronizes the connection establishment between + * rds_tcp_accept_one and rds_tcp_conn_path_connect */ - struct mutex t_conn_lock; + struct mutex t_conn_path_lock; struct socket *t_sock; void *t_orig_write_space; void *t_orig_data_ready; @@ -49,8 +49,8 @@ struct rds_tcp_statistics { /* tcp.c */ void rds_tcp_tune(struct socket *sock); void rds_tcp_nonagle(struct socket *sock); -void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn); -void rds_tcp_reset_callbacks(struct socket *sock, struct rds_connection *conn); +void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp); +void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp); void rds_tcp_restore_callbacks(struct socket *sock, struct rds_tcp_connection *tc); u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc); @@ -60,8 +60,8 @@ extern struct rds_transport rds_tcp_transport; void rds_tcp_accept_work(struct sock *sk); /* tcp_connect.c */ -int rds_tcp_conn_connect(struct rds_connection *conn); -void rds_tcp_conn_shutdown(struct rds_connection *conn); +int rds_tcp_conn_path_connect(struct rds_conn_path *cp); +void rds_tcp_conn_path_shutdown(struct rds_conn_path *conn); void rds_tcp_state_change(struct sock *sk); /* tcp_listen.c */ @@ -75,13 +75,13 @@ int rds_tcp_keepalive(struct socket *sock); int rds_tcp_recv_init(void); void rds_tcp_recv_exit(void); void rds_tcp_data_ready(struct sock *sk); -int rds_tcp_recv(struct rds_connection *conn); +int rds_tcp_recv_path(struct rds_conn_path *cp); void rds_tcp_inc_free(struct rds_incoming *inc); int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); /* tcp_send.c */ -void rds_tcp_xmit_prepare(struct rds_connection *conn); -void rds_tcp_xmit_complete(struct rds_connection *conn); +void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp); +void rds_tcp_xmit_path_complete(struct rds_conn_path *cp); int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); void rds_tcp_write_space(struct sock *sk); diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c index 96c2c4d17909488aa0c8c2a23d6166536eba3d8b..c916715fbe6132ce1e3b9c94db62281c1a72a6ff 100644 --- a/net/rds/tcp_connect.c +++ b/net/rds/tcp_connect.c @@ -41,16 +41,16 @@ void rds_tcp_state_change(struct sock *sk) { void (*state_change)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; read_lock_bh(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { + cp = sk->sk_user_data; + if (!cp) { state_change = sk->sk_state_change; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; state_change = tc->t_orig_state_change; rdsdebug("sock %p state_change to %d\n", tc->t_sock, sk->sk_state); @@ -61,12 +61,11 @@ void rds_tcp_state_change(struct sock *sk) case TCP_SYN_RECV: break; case TCP_ESTABLISHED: - rds_connect_path_complete(&conn->c_path[0], - RDS_CONN_CONNECTING); + rds_connect_path_complete(cp, RDS_CONN_CONNECTING); break; case TCP_CLOSE_WAIT: case TCP_CLOSE: - rds_conn_drop(conn); + rds_conn_path_drop(cp); default: break; } @@ -75,17 +74,18 @@ void rds_tcp_state_change(struct sock *sk) state_change(sk); } -int rds_tcp_conn_connect(struct rds_connection *conn) +int rds_tcp_conn_path_connect(struct rds_conn_path *cp) { struct socket *sock = NULL; struct sockaddr_in src, dest; int ret; - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_connection *conn = cp->cp_conn; + struct rds_tcp_connection *tc = cp->cp_transport_data; - mutex_lock(&tc->t_conn_lock); + mutex_lock(&tc->t_conn_path_lock); - if (rds_conn_up(conn)) { - mutex_unlock(&tc->t_conn_lock); + if (rds_conn_path_up(cp)) { + mutex_unlock(&tc->t_conn_path_lock); return 0; } ret = sock_create_kern(rds_conn_net(conn), PF_INET, @@ -114,10 +114,11 @@ int rds_tcp_conn_connect(struct rds_connection *conn) * once we call connect() we can start getting callbacks and they * own the socket */ - rds_tcp_set_callbacks(sock, conn); + rds_tcp_set_callbacks(sock, cp); ret = sock->ops->connect(sock, (struct sockaddr *)&dest, sizeof(dest), O_NONBLOCK); + cp->cp_outgoing = 1; rdsdebug("connect to address %pI4 returned %d\n", &conn->c_faddr, ret); if (ret == -EINPROGRESS) ret = 0; @@ -125,11 +126,11 @@ int rds_tcp_conn_connect(struct rds_connection *conn) rds_tcp_keepalive(sock); sock = NULL; } else { - rds_tcp_restore_callbacks(sock, conn->c_transport_data); + rds_tcp_restore_callbacks(sock, cp->cp_transport_data); } out: - mutex_unlock(&tc->t_conn_lock); + mutex_unlock(&tc->t_conn_path_lock); if (sock) sock_release(sock); return ret; @@ -144,12 +145,13 @@ int rds_tcp_conn_connect(struct rds_connection *conn) * callbacks to those set by TCP. Our callbacks won't execute again once we * hold the sock lock. */ -void rds_tcp_conn_shutdown(struct rds_connection *conn) +void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; - rdsdebug("shutting down conn %p tc %p sock %p\n", conn, tc, sock); + rdsdebug("shutting down conn %p tc %p sock %p\n", + cp->cp_conn, tc, sock); if (sock) { sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN); diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index f9cc945a77b3c7bab156c5ce0cf71f441fe6cc68..ca975a217a49875272bb2df7ba590b1821f2aeb4 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -79,6 +79,7 @@ int rds_tcp_accept_one(struct socket *sock) struct inet_sock *inet; struct rds_tcp_connection *rs_tcp = NULL; int conn_state; + struct rds_conn_path *cp; if (!sock) /* module unload or netns delete in progress */ return -ENETUNREACH; @@ -120,8 +121,9 @@ int rds_tcp_accept_one(struct socket *sock) * rds_tcp_state_change() will do that cleanup */ rs_tcp = (struct rds_tcp_connection *)conn->c_transport_data; + cp = &conn->c_path[0]; rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING); - mutex_lock(&rs_tcp->t_conn_lock); + mutex_lock(&rs_tcp->t_conn_path_lock); conn_state = rds_conn_state(conn); if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_UP) goto rst_nsk; @@ -136,16 +138,14 @@ int rds_tcp_accept_one(struct socket *sock) !conn->c_path[0].cp_outgoing) { goto rst_nsk; } else { - rds_tcp_reset_callbacks(new_sock, conn); + rds_tcp_reset_callbacks(new_sock, cp); conn->c_path[0].cp_outgoing = 0; /* rds_connect_path_complete() marks RDS_CONN_UP */ - rds_connect_path_complete(&conn->c_path[0], - RDS_CONN_RESETTING); + rds_connect_path_complete(cp, RDS_CONN_RESETTING); } } else { - rds_tcp_set_callbacks(new_sock, conn); - rds_connect_path_complete(&conn->c_path[0], - RDS_CONN_CONNECTING); + rds_tcp_set_callbacks(new_sock, cp); + rds_connect_path_complete(cp, RDS_CONN_CONNECTING); } new_sock = NULL; ret = 0; @@ -156,7 +156,7 @@ int rds_tcp_accept_one(struct socket *sock) ret = 0; out: if (rs_tcp) - mutex_unlock(&rs_tcp->t_conn_lock); + mutex_unlock(&rs_tcp->t_conn_path_lock); if (new_sock) sock_release(new_sock); return ret; diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index 4a87d9ef3084552a5ced68e2ad05a5cbbc4954b7..ad4892e97f91b3fdd4928072ea6f9a9aeaf11352 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -34,7 +34,6 @@ #include <linux/slab.h> #include <net/tcp.h> -#include "rds_single_path.h" #include "rds.h" #include "tcp.h" @@ -148,7 +147,7 @@ static void rds_tcp_cong_recv(struct rds_connection *conn, } struct rds_tcp_desc_arg { - struct rds_connection *conn; + struct rds_conn_path *conn_path; gfp_t gfp; }; @@ -156,8 +155,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, unsigned int offset, size_t len) { struct rds_tcp_desc_arg *arg = desc->arg.data; - struct rds_connection *conn = arg->conn; - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_conn_path *cp = arg->conn_path; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct rds_tcp_incoming *tinc = tc->t_tinc; struct sk_buff *clone; size_t left = len, to_copy; @@ -179,7 +178,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, } tc->t_tinc = tinc; rdsdebug("alloced tinc %p\n", tinc); - rds_inc_init(&tinc->ti_inc, conn, conn->c_faddr); + rds_inc_path_init(&tinc->ti_inc, cp, + cp->cp_conn->c_faddr); /* * XXX * we might be able to use the __ variants when * we've already serialized at a higher level. @@ -229,6 +229,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, } if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) { + struct rds_connection *conn = cp->cp_conn; + if (tinc->ti_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP) rds_tcp_cong_recv(conn, tinc); else @@ -251,15 +253,15 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, } /* the caller has to hold the sock lock */ -static int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp) +static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; read_descriptor_t desc; struct rds_tcp_desc_arg arg; /* It's like glib in the kernel! */ - arg.conn = conn; + arg.conn_path = cp; arg.gfp = gfp; desc.arg.data = &arg; desc.error = 0; @@ -279,16 +281,17 @@ static int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp) * if we fail to allocate we're in trouble.. blindly wait some time before * trying again to see if the VM can free up something for us. */ -int rds_tcp_recv(struct rds_connection *conn) +int rds_tcp_recv_path(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; int ret = 0; - rdsdebug("recv worker conn %p tc %p sock %p\n", conn, tc, sock); + rdsdebug("recv worker path [%d] tc %p sock %p\n", + cp->cp_index, tc, sock); lock_sock(sock->sk); - ret = rds_tcp_read_sock(conn, GFP_KERNEL); + ret = rds_tcp_read_sock(cp, GFP_KERNEL); release_sock(sock->sk); return ret; @@ -297,24 +300,24 @@ int rds_tcp_recv(struct rds_connection *conn) void rds_tcp_data_ready(struct sock *sk) { void (*ready)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; rdsdebug("data ready sk %p\n", sk); read_lock_bh(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { /* check for teardown race */ + cp = sk->sk_user_data; + if (!cp) { /* check for teardown race */ ready = sk->sk_data_ready; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; ready = tc->t_orig_data_ready; rds_tcp_stats_inc(s_tcp_data_ready_calls); - if (rds_tcp_read_sock(conn, GFP_ATOMIC) == -ENOMEM) - queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); out: read_unlock_bh(&sk->sk_callback_lock); ready(sk); diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c index 710f1aae97ad336fc04a17e17022cfab73b83fad..57e0f5826406d8f1ad120a8156ad2cda47afc0d4 100644 --- a/net/rds/tcp_send.c +++ b/net/rds/tcp_send.c @@ -49,16 +49,16 @@ static void rds_tcp_cork(struct socket *sock, int val) set_fs(oldfs); } -void rds_tcp_xmit_prepare(struct rds_connection *conn) +void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rds_tcp_cork(tc->t_sock, 1); } -void rds_tcp_xmit_complete(struct rds_connection *conn) +void rds_tcp_xmit_path_complete(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rds_tcp_cork(tc->t_sock, 0); } @@ -178,27 +178,27 @@ static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack) void rds_tcp_write_space(struct sock *sk) { void (*write_space)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; read_lock_bh(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { + cp = sk->sk_user_data; + if (!cp) { write_space = sk->sk_write_space; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; rdsdebug("write_space for tc %p\n", tc); write_space = tc->t_orig_write_space; rds_tcp_stats_inc(s_tcp_write_space_calls); rdsdebug("tcp una %u\n", rds_tcp_snd_una(tc)); tc->t_last_seen_una = rds_tcp_snd_una(tc); - rds_send_drop_acked(conn, rds_tcp_snd_una(tc), rds_tcp_is_acked); + rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked); if ((atomic_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); out: read_unlock_bh(&sk->sk_callback_lock); diff --git a/net/rds/threads.c b/net/rds/threads.c index 9fbe95bb14a911a3fe712e947496c9f1f890dcf4..bc97d67f29cc24b328efa6d921199dd112127672 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -125,6 +125,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp) conn, &conn->c_laddr, &conn->c_faddr, cp->cp_reconnect_jiffies); + /* let peer with smaller addr initiate reconnect, to avoid duels */ + if (conn->c_trans->t_type == RDS_TRANS_TCP && + conn->c_laddr > conn->c_faddr) + return; + set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); if (cp->cp_reconnect_jiffies == 0) { cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; @@ -152,8 +157,9 @@ void rds_connect_worker(struct work_struct *work) int ret; clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); - if (rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) { - ret = conn->c_trans->conn_connect(conn); + ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING); + if (ret) { + ret = conn->c_trans->conn_path_connect(cp); rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n", conn, &conn->c_laddr, &conn->c_faddr, ret); @@ -203,7 +209,7 @@ void rds_recv_worker(struct work_struct *work) int ret; if (rds_conn_path_state(cp) == RDS_CONN_UP) { - ret = cp->cp_conn->c_trans->recv(cp->cp_conn); + ret = cp->cp_conn->c_trans->recv_path(cp); rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { case -EAGAIN: