Lines Matching refs:con
85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
108 struct connection *con;
156 struct connection *con;
161 hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
162 if (con->nodeid == nodeid) {
164 return con;
178 struct connection *con, *tmp;
181 con = __find_con(nodeid);
182 if (con || !alloc)
183 return con;
185 con = kzalloc(sizeof(*con), alloc);
186 if (!con)
189 con->rx_buflen = dlm_config.ci_buffer_size;
190 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
191 if (!con->rx_buf) {
192 kfree(con);
196 con->nodeid = nodeid;
197 mutex_init(&con->sock_mutex);
198 INIT_LIST_HEAD(&con->writequeue);
199 spin_lock_init(&con->writequeue_lock);
200 INIT_WORK(&con->swork, process_send_sockets);
201 INIT_WORK(&con->rwork, process_recv_sockets);
202 init_waitqueue_head(&con->shutdown_wait);
205 if (con->nodeid) {
208 con->connect_action = zerocon->connect_action;
209 if (!con->rx_action)
210 con->rx_action = zerocon->rx_action;
225 kfree(con->rx_buf);
226 kfree(con);
230 hlist_add_head_rcu(&con->list, &connection_hash[r]);
233 return con;
240 struct connection *con;
244 hlist_for_each_entry_rcu(con, &connection_hash[i], list)
245 conn_func(con);
404 struct connection *con;
407 con = sock2con(sk);
408 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
409 queue_work(recv_workqueue, &con->rwork);
415 struct connection *con;
418 con = sock2con(sk);
419 if (!con)
422 clear_bit(SOCK_NOSPACE, &con->sock->flags);
424 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
425 con->sock->sk->sk_write_pending--;
426 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
429 queue_work(send_workqueue, &con->swork);
434 static inline void lowcomms_connect_sock(struct connection *con)
436 if (test_bit(CF_CLOSE, &con->flags))
438 queue_work(send_workqueue, &con->swork);
459 struct connection *con;
464 con = nodeid2con(nodeid, GFP_NOFS);
465 if (!con)
467 lowcomms_connect_sock(con);
473 struct connection *con;
478 con = sock2con(sk);
479 if (con == NULL)
490 con->nodeid, &inet->inet_daddr,
499 con->nodeid, &sk->sk_v6_daddr,
542 static void add_sock(struct socket *sock, struct connection *con)
547 con->sock = sock;
549 sk->sk_user_data = con;
579 static void close_connection(struct connection *con, bool and_other,
582 bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
584 if (tx && !closing && cancel_work_sync(&con->swork)) {
585 log_print("canceled swork for node %d", con->nodeid);
586 clear_bit(CF_WRITE_PENDING, &con->flags);
588 if (rx && !closing && cancel_work_sync(&con->rwork)) {
589 log_print("canceled rwork for node %d", con->nodeid);
590 clear_bit(CF_READ_PENDING, &con->flags);
593 mutex_lock(&con->sock_mutex);
594 if (con->sock) {
595 restore_callbacks(con->sock);
596 sock_release(con->sock);
597 con->sock = NULL;
599 if (con->othercon && and_other) {
601 close_connection(con->othercon, false, tx, rx);
604 con->rx_leftover = 0;
605 con->retries = 0;
606 mutex_unlock(&con->sock_mutex);
607 clear_bit(CF_CLOSING, &con->flags);
610 static void shutdown_connection(struct connection *con)
614 flush_work(&con->swork);
616 mutex_lock(&con->sock_mutex);
618 if (!con->sock) {
619 mutex_unlock(&con->sock_mutex);
623 set_bit(CF_SHUTDOWN, &con->flags);
624 ret = kernel_sock_shutdown(con->sock, SHUT_WR);
625 mutex_unlock(&con->sock_mutex);
628 con, ret);
631 ret = wait_event_timeout(con->shutdown_wait,
632 !test_bit(CF_SHUTDOWN, &con->flags),
636 con);
644 clear_bit(CF_SHUTDOWN, &con->flags);
645 close_connection(con, false, true, true);
648 static void dlm_tcp_shutdown(struct connection *con)
650 if (con->othercon)
651 shutdown_connection(con->othercon);
652 shutdown_connection(con);
655 static int con_realloc_receive_buf(struct connection *con, int newlen)
664 if (con->rx_leftover)
665 memmove(newbuf, con->rx_buf, con->rx_leftover);
668 kfree(con->rx_buf);
669 con->rx_buflen = newlen;
670 con->rx_buf = newbuf;
676 static int receive_from_sock(struct connection *con)
683 mutex_lock(&con->sock_mutex);
685 if (con->sock == NULL) {
690 if (con->nodeid == 0) {
697 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
698 ret = con_realloc_receive_buf(con, buflen);
706 iov.iov_base = con->rx_buf + con->rx_leftover;
707 iov.iov_len = con->rx_buflen - con->rx_leftover;
711 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
719 buflen = ret + con->rx_leftover;
720 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
728 con->rx_leftover = buflen - ret;
729 if (con->rx_leftover) {
730 memmove(con->rx_buf, con->rx_buf + ret,
731 con->rx_leftover);
738 mutex_unlock(&con->sock_mutex);
742 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
743 queue_work(recv_workqueue, &con->rwork);
744 mutex_unlock(&con->sock_mutex);
748 mutex_unlock(&con->sock_mutex);
751 close_connection(con, false, true, false);
754 con, con->nodeid);
756 clear_bit(CF_SHUTDOWN, &con->flags);
757 wake_up(&con->shutdown_wait);
766 static int accept_from_sock(struct connection *con)
781 mutex_lock_nested(&con->sock_mutex, 0);
783 if (!con->sock) {
784 mutex_unlock(&con->sock_mutex);
788 result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
808 mutex_unlock(&con->sock_mutex);
860 /* close other sock con if we have something new */
888 mutex_unlock(&con->sock_mutex);
893 mutex_unlock(&con->sock_mutex);
929 static int sctp_bind_addrs(struct connection *con, uint16_t port)
940 result = kernel_bind(con->sock, addr, addr_len);
942 result = sock_bind_add(con->sock->sk, addr, addr_len);
958 static void sctp_connect_to_sock(struct connection *con)
966 if (con->nodeid == 0) {
971 dlm_comm_mark(con->nodeid, &mark);
973 mutex_lock(&con->sock_mutex);
976 if (con->retries++ > MAX_CONNECT_RETRIES)
979 if (con->sock) {
980 log_print("node %d already connected.", con->nodeid);
985 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
987 log_print("no address for nodeid %d", con->nodeid);
999 con->rx_action = receive_from_sock;
1000 con->connect_action = sctp_connect_to_sock;
1001 add_sock(sock, con);
1004 if (sctp_bind_addrs(con, 0))
1009 log_print("connecting to %d", con->nodeid);
1030 con->sock = NULL;
1043 log_print("connect %d try %d error %d", con->nodeid,
1044 con->retries, result);
1045 mutex_unlock(&con->sock_mutex);
1047 lowcomms_connect_sock(con);
1052 mutex_unlock(&con->sock_mutex);
1056 static void tcp_connect_to_sock(struct connection *con)
1064 if (con->nodeid == 0) {
1069 dlm_comm_mark(con->nodeid, &mark);
1071 mutex_lock(&con->sock_mutex);
1072 if (con->retries++ > MAX_CONNECT_RETRIES)
1076 if (con->sock)
1088 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1090 log_print("no address for nodeid %d", con->nodeid);
1094 con->rx_action = receive_from_sock;
1095 con->connect_action = tcp_connect_to_sock;
1096 con->shutdown_action = dlm_tcp_shutdown;
1097 add_sock(sock, con);
1112 log_print("connecting to %d", con->nodeid);
1125 if (con->sock) {
1126 sock_release(con->sock);
1127 con->sock = NULL;
1140 log_print("connect %d try %d error %d", con->nodeid,
1141 con->retries, result);
1142 mutex_unlock(&con->sock_mutex);
1144 lowcomms_connect_sock(con);
1148 mutex_unlock(&con->sock_mutex);
1152 static struct socket *tcp_create_listen_sock(struct connection *con,
1180 sock->sk->sk_user_data = con;
1182 con->rx_action = accept_from_sock;
1183 con->connect_action = tcp_connect_to_sock;
1193 con->sock = NULL;
1241 struct connection *con = nodeid2con(0, GFP_NOFS);
1243 if (!con)
1260 /* Init con struct */
1261 sock->sk->sk_user_data = con;
1263 con->sock = sock;
1264 con->sock->sk->sk_data_ready = lowcomms_data_ready;
1265 con->rx_action = accept_from_sock;
1266 con->connect_action = sctp_connect_to_sock;
1271 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1284 con->sock = NULL;
1292 struct connection *con = nodeid2con(0, GFP_NOFS);
1295 if (!con)
1307 sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1309 add_sock(sock, con);
1321 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1340 entry->con = con;
1347 struct connection *con;
1351 con = nodeid2con(nodeid, allocation);
1352 if (!con)
1355 spin_lock(&con->writequeue_lock);
1356 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1357 if ((&e->list == &con->writequeue) ||
1365 spin_unlock(&con->writequeue_lock);
1373 e = new_writequeue_entry(con, allocation);
1375 spin_lock(&con->writequeue_lock);
1379 list_add_tail(&e->list, &con->writequeue);
1380 spin_unlock(&con->writequeue_lock);
1389 struct connection *con = e->con;
1392 spin_lock(&con->writequeue_lock);
1397 spin_unlock(&con->writequeue_lock);
1399 queue_work(send_workqueue, &con->swork);
1403 spin_unlock(&con->writequeue_lock);
1408 static void send_to_sock(struct connection *con)
1416 mutex_lock(&con->sock_mutex);
1417 if (con->sock == NULL)
1420 spin_lock(&con->writequeue_lock);
1422 e = list_entry(con->writequeue.next, struct writequeue_entry,
1424 if ((struct list_head *) e == &con->writequeue)
1430 spin_unlock(&con->writequeue_lock);
1434 ret = kernel_sendpage(con->sock, e->page, offset, len,
1438 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1439 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1443 set_bit(SOCK_NOSPACE, &con->sock->flags);
1444 con->sock->sk->sk_write_pending++;
1458 spin_lock(&con->writequeue_lock);
1461 spin_unlock(&con->writequeue_lock);
1463 mutex_unlock(&con->sock_mutex);
1467 mutex_unlock(&con->sock_mutex);
1468 close_connection(con, false, false, true);
1471 queue_work(send_workqueue, &con->swork);
1475 mutex_unlock(&con->sock_mutex);
1476 queue_work(send_workqueue, &con->swork);
1480 static void clean_one_writequeue(struct connection *con)
1484 spin_lock(&con->writequeue_lock);
1485 list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1489 spin_unlock(&con->writequeue_lock);
1496 struct connection *con;
1500 con = nodeid2con(nodeid, 0);
1501 if (con) {
1502 set_bit(CF_CLOSE, &con->flags);
1503 close_connection(con, true, true, true);
1504 clean_one_writequeue(con);
1523 struct connection *con = container_of(work, struct connection, rwork);
1526 clear_bit(CF_READ_PENDING, &con->flags);
1528 err = con->rx_action(con);
1535 struct connection *con = container_of(work, struct connection, swork);
1537 clear_bit(CF_WRITE_PENDING, &con->flags);
1538 if (con->sock == NULL) /* not mutex protected so check it inside too */
1539 con->connect_action(con);
1540 if (!list_empty(&con->writequeue))
1541 send_to_sock(con);
1572 static void _stop_conn(struct connection *con, bool and_other)
1574 mutex_lock(&con->sock_mutex);
1575 set_bit(CF_CLOSE, &con->flags);
1576 set_bit(CF_READ_PENDING, &con->flags);
1577 set_bit(CF_WRITE_PENDING, &con->flags);
1578 if (con->sock && con->sock->sk) {
1579 write_lock_bh(&con->sock->sk->sk_callback_lock);
1580 con->sock->sk->sk_user_data = NULL;
1581 write_unlock_bh(&con->sock->sk->sk_callback_lock);
1583 if (con->othercon && and_other)
1584 _stop_conn(con->othercon, false);
1585 mutex_unlock(&con->sock_mutex);
1588 static void stop_conn(struct connection *con)
1590 _stop_conn(con, true);
1593 static void shutdown_conn(struct connection *con)
1595 if (con->shutdown_action)
1596 con->shutdown_action(con);
1601 struct connection *con = container_of(rcu, struct connection, rcu);
1603 kfree(con->rx_buf);
1604 kfree(con);
1607 static void free_conn(struct connection *con)
1609 close_connection(con, true, true, true);
1611 hlist_del_rcu(&con->list);
1613 if (con->othercon) {
1614 clean_one_writequeue(con->othercon);
1615 call_rcu(&con->othercon->rcu, connection_release);
1617 clean_one_writequeue(con);
1618 call_rcu(&con->rcu, connection_release);
1625 struct connection *con;
1636 hlist_for_each_entry_rcu(con, &connection_hash[i],
1638 ok &= test_bit(CF_READ_PENDING, &con->flags);
1639 ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1640 if (con->othercon) {
1642 &con->othercon->flags);
1644 &con->othercon->flags);
1674 struct connection *con;
1705 con = nodeid2con(0,0);
1706 if (con)
1707 free_conn(con);