Lines Matching refs:con

133 	struct connection *con;
163 int (*connect)(struct connection *con, struct socket *sock,
212 static void lowcomms_queue_swork(struct connection *con)
214 assert_spin_locked(&con->writequeue_lock);
216 if (!test_bit(CF_IO_STOP, &con->flags) &&
217 !test_bit(CF_APP_LIMITED, &con->flags) &&
218 !test_and_set_bit(CF_SEND_PENDING, &con->flags))
219 queue_work(io_workqueue, &con->swork);
222 static void lowcomms_queue_rwork(struct connection *con)
225 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
228 if (!test_bit(CF_IO_STOP, &con->flags) &&
229 !test_and_set_bit(CF_RECV_PENDING, &con->flags))
230 queue_work(io_workqueue, &con->rwork);
252 static struct writequeue_entry *con_next_wq(struct connection *con)
256 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
269 struct connection *con;
271 hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
272 if (con->nodeid == nodeid)
273 return con;
279 static void dlm_con_init(struct connection *con, int nodeid)
281 con->nodeid = nodeid;
282 init_rwsem(&con->sock_lock);
283 INIT_LIST_HEAD(&con->writequeue);
284 spin_lock_init(&con->writequeue_lock);
285 INIT_WORK(&con->swork, process_send_sockets);
286 INIT_WORK(&con->rwork, process_recv_sockets);
287 spin_lock_init(&con->addrs_lock);
288 init_waitqueue_head(&con->shutdown_wait);
297 struct connection *con, *tmp;
301 con = __find_con(nodeid, r);
302 if (con || !alloc)
303 return con;
305 con = kzalloc(sizeof(*con), alloc);
306 if (!con)
309 dlm_con_init(con, nodeid);
321 kfree(con);
325 hlist_add_head_rcu(&con->list, &connection_hash[r]);
328 return con;
364 struct connection *con;
371 con = nodeid2con(nodeid, 0);
372 if (!con) {
377 spin_lock(&con->addrs_lock);
378 if (!con->addr_count) {
379 spin_unlock(&con->addrs_lock);
384 memcpy(&sas, &con->addr[con->curr_addr_index],
388 con->curr_addr_index++;
389 if (con->curr_addr_index == con->addr_count)
390 con->curr_addr_index = 0;
393 *mark = con->mark;
394 spin_unlock(&con->addrs_lock);
421 struct connection *con;
426 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
427 WARN_ON_ONCE(!con->addr_count);
429 spin_lock(&con->addrs_lock);
430 for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
431 if (addr_compare(&con->addr[addr_i], addr)) {
432 *nodeid = con->nodeid;
433 *mark = con->mark;
434 spin_unlock(&con->addrs_lock);
439 spin_unlock(&con->addrs_lock);
447 static bool dlm_lowcomms_con_has_addr(const struct connection *con,
452 for (i = 0; i < con->addr_count; i++) {
453 if (addr_compare(&con->addr[i], addr))
462 struct connection *con;
466 con = nodeid2con(nodeid, GFP_NOFS);
467 if (!con) {
472 spin_lock(&con->addrs_lock);
473 if (!con->addr_count) {
474 memcpy(&con->addr[0], addr, sizeof(*addr));
475 con->addr_count = 1;
476 con->mark = dlm_config.ci_mark;
477 spin_unlock(&con->addrs_lock);
482 ret = dlm_lowcomms_con_has_addr(con, addr);
484 spin_unlock(&con->addrs_lock);
489 if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
490 spin_unlock(&con->addrs_lock);
495 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
497 spin_unlock(&con->addrs_lock);
504 struct connection *con = sock2con(sk);
508 set_bit(CF_RECV_INTR, &con->flags);
509 lowcomms_queue_rwork(con);
514 struct connection *con = sock2con(sk);
516 clear_bit(SOCK_NOSPACE, &con->sock->flags);
518 spin_lock_bh(&con->writequeue_lock);
519 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
520 con->sock->sk->sk_write_pending--;
521 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
524 lowcomms_queue_swork(con);
525 spin_unlock_bh(&con->writequeue_lock);
546 struct connection *con;
550 con = nodeid2con(nodeid, 0);
551 if (WARN_ON_ONCE(!con)) {
556 down_read(&con->sock_lock);
557 if (!con->sock) {
558 spin_lock_bh(&con->writequeue_lock);
559 lowcomms_queue_swork(con);
560 spin_unlock_bh(&con->writequeue_lock);
562 up_read(&con->sock_lock);
571 struct connection *con;
575 con = nodeid2con(nodeid, 0);
576 if (!con) {
581 spin_lock(&con->addrs_lock);
582 con->mark = mark;
583 spin_unlock(&con->addrs_lock);
590 struct connection *con = sock2con(sk);
599 con->nodeid, &inet->inet_daddr,
608 con->nodeid, &sk->sk_v6_daddr,
622 dlm_midcomms_unack_msg_resend(con->nodeid);
641 static void add_sock(struct socket *sock, struct connection *con)
646 con->sock = sock;
648 sk->sk_user_data = con;
723 static void allow_connection_io(struct connection *con)
725 if (con->othercon)
726 clear_bit(CF_IO_STOP, &con->othercon->flags);
727 clear_bit(CF_IO_STOP, &con->flags);
730 static void stop_connection_io(struct connection *con)
732 if (con->othercon)
733 stop_connection_io(con->othercon);
735 spin_lock_bh(&con->writequeue_lock);
736 set_bit(CF_IO_STOP, &con->flags);
737 spin_unlock_bh(&con->writequeue_lock);
739 down_write(&con->sock_lock);
740 if (con->sock) {
741 lock_sock(con->sock->sk);
742 restore_callbacks(con->sock->sk);
743 release_sock(con->sock->sk);
745 up_write(&con->sock_lock);
747 cancel_work_sync(&con->swork);
748 cancel_work_sync(&con->rwork);
752 static void close_connection(struct connection *con, bool and_other)
756 if (con->othercon && and_other)
757 close_connection(con->othercon, false);
759 down_write(&con->sock_lock);
760 if (!con->sock) {
761 up_write(&con->sock_lock);
765 dlm_close_sock(&con->sock);
778 spin_lock_bh(&con->writequeue_lock);
779 if (!list_empty(&con->writequeue)) {
780 e = list_first_entry(&con->writequeue, struct writequeue_entry,
785 spin_unlock_bh(&con->writequeue_lock);
787 con->rx_leftover = 0;
788 con->retries = 0;
789 clear_bit(CF_APP_LIMITED, &con->flags);
790 clear_bit(CF_RECV_PENDING, &con->flags);
791 clear_bit(CF_SEND_PENDING, &con->flags);
792 up_write(&con->sock_lock);
795 static void shutdown_connection(struct connection *con, bool and_other)
799 if (con->othercon && and_other)
800 shutdown_connection(con->othercon, false);
803 down_read(&con->sock_lock);
805 if (!con->sock) {
806 up_read(&con->sock_lock);
810 ret = kernel_sock_shutdown(con->sock, SHUT_WR);
811 up_read(&con->sock_lock);
814 con, ret);
817 ret = wait_event_timeout(con->shutdown_wait, !con->sock,
821 con);
829 close_connection(con, false);
899 static int receive_from_sock(struct connection *con, int buflen)
906 pentry = new_processqueue_entry(con->nodeid, buflen);
910 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
915 iov.iov_base = pentry->buf + con->rx_leftover;
916 iov.iov_len = buflen - con->rx_leftover;
920 clear_bit(CF_RECV_INTR, &con->flags);
922 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
924 trace_dlm_recv(con->nodeid, ret);
926 lock_sock(con->sock->sk);
927 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
928 release_sock(con->sock->sk);
932 clear_bit(CF_RECV_PENDING, &con->flags);
933 release_sock(con->sock->sk);
946 buflen_real = ret + con->rx_leftover;
947 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
960 con->rx_leftover = buflen_real - ret;
961 memmove(con->rx_leftover_buf, pentry->buf + ret,
962 con->rx_leftover);
1063 /* close other sock con if we have something new */
1159 static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1177 entry->con = con;
1183 static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1189 spin_lock_bh(&con->writequeue_lock);
1190 if (!list_empty(&con->writequeue)) {
1191 e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1205 e = new_writequeue_entry(con);
1215 list_add_tail(&e->list, &con->writequeue);
1218 spin_unlock_bh(&con->writequeue_lock);
1222 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1236 e = new_wq_entry(con, len, ppc, cb, data);
1259 struct connection *con;
1272 con = nodeid2con(nodeid, 0);
1273 if (WARN_ON_ONCE(!con)) {
1278 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
1295 struct connection *con = e->con;
1298 spin_lock_bh(&con->writequeue_lock);
1308 lowcomms_queue_swork(con);
1311 spin_unlock_bh(&con->writequeue_lock);
1342 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
1359 static int send_to_sock(struct connection *con)
1368 spin_lock_bh(&con->writequeue_lock);
1369 e = con_next_wq(con);
1371 clear_bit(CF_SEND_PENDING, &con->flags);
1372 spin_unlock_bh(&con->writequeue_lock);
1379 spin_unlock_bh(&con->writequeue_lock);
1383 ret = sock_sendmsg(con->sock, &msg);
1384 trace_dlm_send(con->nodeid, ret);
1386 lock_sock(con->sock->sk);
1387 spin_lock_bh(&con->writequeue_lock);
1388 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1389 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1393 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
1394 con->sock->sk->sk_write_pending++;
1396 clear_bit(CF_SEND_PENDING, &con->flags);
1397 spin_unlock_bh(&con->writequeue_lock);
1398 release_sock(con->sock->sk);
1403 spin_unlock_bh(&con->writequeue_lock);
1404 release_sock(con->sock->sk);
1411 spin_lock_bh(&con->writequeue_lock);
1413 spin_unlock_bh(&con->writequeue_lock);
1418 static void clean_one_writequeue(struct connection *con)
1422 spin_lock_bh(&con->writequeue_lock);
1423 list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1426 spin_unlock_bh(&con->writequeue_lock);
1431 struct connection *con = container_of(rcu, struct connection, rcu);
1433 WARN_ON_ONCE(!list_empty(&con->writequeue));
1434 WARN_ON_ONCE(con->sock);
1435 kfree(con);
1442 struct connection *con;
1448 con = nodeid2con(nodeid, 0);
1449 if (WARN_ON_ONCE(!con)) {
1454 stop_connection_io(con);
1456 close_connection(con, true);
1459 hlist_del_rcu(&con->list);
1462 clean_one_writequeue(con);
1463 call_srcu(&connections_srcu, &con->rcu, connection_release);
1464 if (con->othercon) {
1465 clean_one_writequeue(con->othercon);
1466 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
1482 struct connection *con = container_of(work, struct connection, rwork);
1485 down_read(&con->sock_lock);
1486 if (!con->sock) {
1487 up_read(&con->sock_lock);
1493 ret = receive_from_sock(con, buflen);
1495 up_read(&con->sock_lock);
1502 close_connection(con, false);
1503 wake_up(&con->shutdown_wait);
1508 queue_work(io_workqueue, &con->rwork);
1513 if (test_bit(CF_IS_OTHERCON, &con->flags)) {
1514 close_connection(con, false);
1516 spin_lock_bh(&con->writequeue_lock);
1517 lowcomms_queue_swork(con);
1518 spin_unlock_bh(&con->writequeue_lock);
1548 static int dlm_connect(struct connection *con)
1556 result = nodeid_to_addr(con->nodeid, &addr, NULL,
1559 log_print("no address for nodeid %d", con->nodeid);
1578 add_sock(sock, con);
1580 log_print_ratelimited("connecting to %d", con->nodeid);
1582 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
1592 dlm_close_sock(&con->sock);
1603 struct connection *con = container_of(work, struct connection, swork);
1606 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
1608 down_read(&con->sock_lock);
1609 if (!con->sock) {
1610 up_read(&con->sock_lock);
1611 down_write(&con->sock_lock);
1612 if (!con->sock) {
1613 ret = dlm_connect(con);
1626 up_write(&con->sock_lock);
1628 con->nodeid, con->retries++, ret);
1635 queue_work(io_workqueue, &con->swork);
1639 downgrade_write(&con->sock_lock);
1643 ret = send_to_sock(con);
1645 up_read(&con->sock_lock);
1654 queue_work(io_workqueue, &con->swork);
1658 close_connection(con, false);
1661 spin_lock_bh(&con->writequeue_lock);
1662 lowcomms_queue_swork(con);
1663 spin_unlock_bh(&con->writequeue_lock);
1711 struct connection *con;
1724 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1725 shutdown_connection(con, true);
1726 stop_connection_io(con);
1728 close_connection(con, true);
1730 clean_one_writequeue(con);
1731 if (con->othercon)
1732 clean_one_writequeue(con->othercon);
1733 allow_connection_io(con);
1818 static int dlm_tcp_connect(struct connection *con, struct socket *sock,
1873 static int dlm_sctp_connect(struct connection *con, struct socket *sock,
1980 struct connection *con;
1985 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1987 hlist_del_rcu(&con->list);
1990 if (con->othercon)
1991 call_srcu(&connections_srcu, &con->othercon->rcu,
1993 call_srcu(&connections_srcu, &con->rcu, connection_release);